1 Commits

31 changed files with 381 additions and 636 deletions

1
.gitignore vendored
View File

@@ -4,7 +4,6 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
bin/
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test

View File

@@ -1,80 +0,0 @@
.PHONY: build build-linux build-windows build-all clean help
# Variables
BINARY_NAME=go-migrate
CMD_PATH=./cmd/go_migrate
OUTPUT_DIR=bin
VERSION?=$(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
GIT_COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Flags de compilación
LD_FLAGS=-ldflags="-s -w -X main.Version=$(VERSION) -X main.BuildTime=$(BUILD_TIME) -X main.GitCommit=$(GIT_COMMIT)"
# Default: compilar para el SO actual
build: build-$(OS)
ifeq ($(OS),Windows_NT)
build-native: build-windows
else
build-native: build-linux
endif
# Compilar para Linux (sin CGO para máxima compatibilidad)
build-linux:
@echo "Compilando para Linux..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64"
# Compilar para Windows
build-windows:
@echo "Compilando para Windows..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe"
# Compilar para ambas plataformas
build-all: build-linux build-windows
@echo ""
@echo "Binarios compilados:"
@ls -lh $(OUTPUT_DIR)/$(BINARY_NAME)*
# Compilar para Linux arm64 (opcional, para Raspberry Pi, etc.)
build-linux-arm64:
@echo "Compilando para Linux ARM64..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64"
# Limpiar binarios
clean:
@echo "Limpiando binarios..."
@rm -rf $(OUTPUT_DIR)
@echo "Limpieza completada"
# Ayuda
help:
@echo "Comandos disponibles:"
@echo ""
@echo " make build - Compilar para el SO actual (Linux/Windows)"
@echo " make build-linux - Compilar para Linux x86_64"
@echo " make build-windows - Compilar para Windows x86_64"
@echo " make build-linux-arm64 - Compilar para Linux ARM64 (opcional)"
@echo " make build-all - Compilar para Linux y Windows"
@echo " make clean - Eliminar binarios compilados"
@echo " make help - Mostrar esta ayuda"
@echo ""
@echo "Ejemplos de uso:"
@echo " make build-all # Crear binarios para ambas plataformas"
@echo " make build-linux OS= # Crear solo para Linux"
@echo ""

77
cmd/go_migrate/connect.go Normal file
View File

@@ -0,0 +1,77 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func connectToSqlServer() (*sql.DB, error) {
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
}
return db, nil
}
func connectToPostgres() (*pgxpool.Pool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
}
return pool, nil
}
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
var sourceDbErr, targetDbErr error
var sourceDb *sql.DB
var targetDb *pgxpool.Pool
var wg sync.WaitGroup
wg.Go(func() {
sourceDb, sourceDbErr = connectToSqlServer()
if sourceDbErr != nil {
log.Error("Unable to connect to source db: ", sourceDbErr)
}
})
wg.Go(func() {
targetDb, targetDbErr = connectToPostgres()
if targetDbErr != nil {
log.Error("Unable to connect to target db: ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Unable to connect to databases")
}
return sourceDb, targetDb, nil
}

View File

@@ -2,18 +2,18 @@ package main
import ( import (
"context" "context"
"database/sql"
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
) )
func main() { func main() {
@@ -33,33 +33,11 @@ func main() {
log.Info("=== Starting migration ===") log.Info("=== Starting migration ===")
var wgConnect errgroup.Group sourceDb, targetDb, connError := connectToDatabases()
var sourceDb, targetDb dbwrapper.DbWrapper if connError != nil {
log.Fatal("Connection error: ", connError)
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
if err != nil {
return err
} }
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
if err := wgConnect.Wait(); err != nil {
log.Error("Connection error: ", err)
return
}
defer sourceDb.Close() defer sourceDb.Close()
defer targetDb.Close() defer targetDb.Close()
@@ -92,14 +70,14 @@ func main() {
func processMigrationJobs( func processMigrationJobs(
ctx context.Context, ctx context.Context,
sourceDb dbwrapper.DbWrapper, sourceDb *sql.DB,
targetDb dbwrapper.DbWrapper, targetDb *pgxpool.Pool,
jobs []config.Job, jobs []config.Job,
maxParallelWorkers int, maxParallelWorkers int,
) []models.JobResult { ) []JobResult {
if len(jobs) == 0 { if len(jobs) == 0 {
log.Info("No migration jobs configured") log.Info("No migration jobs configured")
return []models.JobResult{} return []JobResult{}
} }
if maxParallelWorkers <= 0 { if maxParallelWorkers <= 0 {
@@ -112,15 +90,16 @@ func processMigrationJobs(
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan models.JobResult, len(jobs)) chJobResults := make(chan JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs)) chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup var wgJobs sync.WaitGroup
targetDbWrapper := db.NewPostgresDbWrapper(targetDb)
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer() transformer := transformers.NewMssqlTransformer()
loader := loaders.NewGenericLoader(targetDb) loader := loaders.NewPostgresLoader(targetDb)
for i := range maxParallelWorkers { for i := range maxParallelWorkers {
wgJobs.Go(func() { wgJobs.Go(func() {
@@ -128,7 +107,7 @@ func processMigrationJobs(
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob( res := processMigrationJob(
ctx, ctx,
targetDb, targetDbWrapper,
sourceTableAnalyzer, sourceTableAnalyzer,
targetTableAnalyzer, targetTableAnalyzer,
extractor, extractor,
@@ -152,26 +131,10 @@ func processMigrationJobs(
close(chJobResults) close(chJobResults)
}() }()
var finalResults []models.JobResult var finalResults []JobResult
for res := range chJobResults { for res := range chJobResults {
finalResults = append(finalResults, res) finalResults = append(finalResults, res)
} }
return finalResults return finalResults
} }
func connectWithTimeout(ctx context.Context, dbType string, dbUrl string, timeout time.Duration) (dbwrapper.DbWrapper, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sourceDb, err := dbwrapper.New(dbType)
if err != nil {
return nil, err
}
if err = sourceDb.Connect(localCtx, dbUrl); err != nil {
return nil, err
}
return sourceDb, nil
}

13
cmd/go_migrate/metrics.go Normal file
View File

@@ -0,0 +1,13 @@
package main
import "time"
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -2,14 +2,13 @@ package main
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
@@ -19,18 +18,19 @@ import (
func processMigrationJob( func processMigrationJob(
ctx context.Context, ctx context.Context,
targetDbWrapper dbwrapper.DbWrapper, // sourceDbWrapper db.DbWrapper,
targetDbWrapper db.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer, sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor, extractor etl.Extractor,
transformer etl.Transformer, transformer etl.Transformer,
loader etl.Loader, loader etl.Loader,
job config.Job, job config.Job,
) models.JobResult { ) JobResult {
localCtx, cancel := context.WithCancel(ctx) localCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
result := models.JobResult{ result := JobResult{
JobName: job.Name, JobName: job.Name,
StartTime: time.Now(), StartTime: time.Now(),
} }
@@ -234,9 +234,5 @@ func processMigrationJob(
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded) result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed) result.RowsFailed = atomic.LoadInt64(&rowsFailed)
if result.RowsRead != result.RowsLoaded {
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
return result return result
} }

View File

@@ -1,6 +1,6 @@
max_parallel_workers: 4 max_parallel_workers: 4
source_db_type: sqlserver source_db_type: sqlserver
target_db_type: sqlserver target_db_type: postgres
defaults: defaults:
max_extractors: 2 max_extractors: 2
@@ -30,12 +30,6 @@ jobs:
table: MANZANA table: MANZANA
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
# - 'TRUNCATE TABLE "Cartografia"."MANZANA"'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto - name: red_puerto
enabled: true enabled: true
@@ -48,6 +42,5 @@ jobs:
table: PUERTO table: PUERTO
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
# - 'TRUNCATE TABLE "Red"."PUERTO"'
post_sql: post_sql:
- "SELECT 1" - "SELECT 1"

2
go.mod
View File

@@ -15,6 +15,8 @@ require (
) )
require ( require (
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect

4
go.sum
View File

@@ -16,6 +16,10 @@ github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZ
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -50,12 +50,6 @@ type Job struct {
PreSQL []string `yaml:"pre_sql"` PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"` PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"` JobConfig `yaml:",inline"`
Range struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
} }
type MigrationConfig struct { type MigrationConfig struct {
@@ -82,8 +76,6 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.MaxParallelWorkers = raw.MaxParallelWorkers c.MaxParallelWorkers = raw.MaxParallelWorkers
c.Defaults = raw.Defaults c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition) c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs { for _, node := range raw.Jobs {

View File

@@ -103,8 +103,8 @@ func ExtractorErrorHandler(
if err.HasLastId { if err.HasLastId {
newPartition.ParentId = err.Partition.Id newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New() newPartition.Id = uuid.New()
newPartition.Range.Min = err.LastId newPartition.LowerLimit = err.LastId
newPartition.Range.IsMinInclusive = false newPartition.IsLowerLimitInclusive = false
} }
requeueWithBackoff(ctx, delay, func() { requeueWithBackoff(ctx, delay, func() {

View File

@@ -1,19 +0,0 @@
package dbwrapper
import "fmt"
type Factory func() DbWrapper
var drivers = make(map[string]Factory)
func Register(name string, factory Factory) {
drivers[name] = factory
}
func New(driverType string) (DbWrapper, error) {
factory, ok := drivers[driverType]
if !ok {
return nil, fmt.Errorf("driver not yet supported: %s", driverType)
}
return factory(), nil
}

View File

@@ -1,176 +0,0 @@
package dbwrapper
import (
"context"
"database/sql"
"fmt"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
})
}
type mssqlRowResult struct {
row *sql.Row
}
func (mr *mssqlRowResult) Scan(dest ...any) error {
return mr.row.Scan(dest...)
}
type mssqlRowsResult struct {
columns []string
rows *sql.Rows
}
func (mr *mssqlRowsResult) Close() error {
return mr.rows.Close()
}
func (mr *mssqlRowsResult) Columns() ([]string, error) {
if mr.columns != nil {
return mr.columns, nil
}
return mr.rows.Columns()
}
func (mr *mssqlRowsResult) Err() error {
return mr.rows.Err()
}
func (mr *mssqlRowsResult) Next() bool {
return mr.rows.Next()
}
func (mr *mssqlRowsResult) Scan(dest ...any) error {
return mr.rows.Scan(dest...)
}
func (mr *mssqlRowsResult) Values() ([]any, error) {
columns, err := mr.Columns()
if err != nil {
return nil, err
}
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := mr.rows.Scan(scanArgs...); err != nil {
return nil, err
}
return rowValues, nil
}
type mssqlDbWrapper struct {
db *sql.DB
dialect string
}
func (mw *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error {
db, err := sql.Open("sqlserver", dbUrl)
if err != nil {
return err
}
if err := db.PingContext(ctx); err != nil {
if err := db.Close(); err != nil {
return err
}
return err
}
mw.db = db
return nil
}
func (mw *mssqlDbWrapper) Close() error {
return mw.db.Close()
}
func (mw *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, execErr := mw.db.ExecContext(ctx, query, args...)
if execErr != nil {
return ExecResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: affectedRows}, nil
}
func (mw *mssqlDbWrapper) GetDialect() string {
return mw.dialect
}
func (mw *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := mw.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &mssqlRowsResult{columns: nil, rows: rows}, nil
}
func (mw *mssqlDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := mw.db.QueryRowContext(ctx, query, args...)
return &mssqlRowResult{row: row}
}
func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
tx, err := mw.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
fullTableName := fmt.Sprintf("[%s].[%s]", schema, table)
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, columnNames...))
if err != nil {
tx.Rollback()
return 0, err
}
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
if err := stmt.Close(); err != nil {
tx.Rollback()
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
rowsAffected, raErr := result.RowsAffected()
if raErr != nil {
return 0, nil
}
return rowsAffected, nil
}

View File

@@ -1,128 +0,0 @@
package dbwrapper
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
})
}
type postgresRowResult struct {
row pgx.Row
}
func (pr *postgresRowResult) Scan(dest ...any) error {
return pr.row.Scan(dest...)
}
type postgresRowsResult struct {
columns []string
rows pgx.Rows
}
func (pr *postgresRowsResult) Close() error {
pr.rows.Close()
return nil
}
func (pr *postgresRowsResult) Columns() ([]string, error) {
if pr.columns != nil {
return pr.columns, nil
}
rawColumns := pr.rows.FieldDescriptions()
if rawColumns == nil {
return nil, errors.New("error retrieving columns")
}
columns := make([]string, 0, len(rawColumns))
for _, rc := range rawColumns {
columns = append(columns, rc.Name)
}
return columns, nil
}
func (pr *postgresRowsResult) Err() error {
return pr.rows.Err()
}
func (pr *postgresRowsResult) Next() bool {
return pr.rows.Next()
}
func (pr *postgresRowsResult) Scan(dest ...any) error {
return pr.rows.Scan(dest...)
}
func (pr *postgresRowsResult) Values() ([]any, error) {
return pr.rows.Values()
}
type postgresDbWrapper struct {
db *pgxpool.Pool
dialect string
}
func (pw *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error {
pool, err := pgxpool.New(ctx, dbUrl)
if err != nil {
return err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return err
}
pw.db = pool
return nil
}
func (pw *postgresDbWrapper) Close() error {
pw.db.Close()
return nil
}
func (pw *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, err := pw.db.Exec(ctx, query, args...)
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: result.RowsAffected()}, nil
}
func (pw *postgresDbWrapper) GetDialect() string {
return pw.dialect
}
func (pw *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := pw.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRowsResult{columns: nil, rows: rows}, nil
}
func (pw *postgresDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := pw.db.QueryRow(ctx, query, args...)
return &postgresRowResult{row: row}
}
func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
affectedRows, err := pw.db.CopyFrom(ctx, pgx.Identifier{schema, table}, columnNames, pgx.CopyFromRows(rows))
if err != nil {
return 0, err
}
return affectedRows, nil
}

View File

@@ -1,35 +0,0 @@
package dbwrapper
import (
"context"
"errors"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
type ExecResult struct {
AffectedRows int64
}
type RowsResult interface {
Close() error
Columns() ([]string, error)
Err() error
Next() bool
Scan(dest ...any) error
Values() ([]any, error)
}
type RowResult interface {
Scan(dest ...any) error
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
Exec(ctx context.Context, query string, args ...any) (ExecResult, error)
GetDialect() string
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
}

30
internal/app/db/mssql.go Normal file
View File

@@ -0,0 +1,30 @@
package db
import (
"context"
"database/sql"
)
type MssqlDbWrapper struct {
db *sql.DB
}
func NewMssqlDbWrapper(db *sql.DB) DbWrapper {
return &MssqlDbWrapper{db: db}
}
func (wrapper *MssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, execErr := wrapper.db.ExecContext(ctx, query, args...)
if execErr != nil {
return DbWrapperResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: affectedRows,
}, nil
}

View File

@@ -0,0 +1,47 @@
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
type PostgresDbWrapper struct {
db *pgxpool.Pool
}
func NewPostgresDbWrapper(db *pgxpool.Pool) DbWrapper {
return &PostgresDbWrapper{db: db}
}
func (wrapper *PostgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, err := wrapper.db.Exec(ctx, query, args...)
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: result.RowsAffected(),
}, nil
}

11
internal/app/db/types.go Normal file
View File

@@ -0,0 +1,11 @@
package db
import "context"
type DbWrapperResult struct {
AffectedRows int64
}
type DbWrapper interface {
Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error)
}

View File

@@ -13,17 +13,16 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlExtractor struct { type MssqlExtractor struct {
db dbwrapper.DbWrapper db *sql.DB
} }
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor { func NewMssqlExtractor(db *sql.DB) etl.Extractor {
return &MssqlExtractor{db: db} return &MssqlExtractor{db: db}
} }
@@ -43,9 +42,9 @@ func buildExtractQueryMssql(
for i, col := range columns { for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name()) fmt.Fprintf(&sbQuery, "[%s]", col.Name())
// if col.Type() == "GEOMETRY" { if col.Type() == "GEOMETRY" {
// fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name()) fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
// } }
if i < len(columns)-1 { if i < len(columns)-1 {
sbQuery.WriteString(", ") sbQuery.WriteString(", ")
@@ -108,18 +107,18 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int, error) { ) (int, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive) query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive)
var queryArgs []any var queryArgs []any
if partition.HasRange { if partition.ShouldUseRange {
queryArgs = append(queryArgs, queryArgs = append(queryArgs,
sql.Named("min", partition.Range.Min), sql.Named("min", partition.LowerLimit),
sql.Named("max", partition.Range.Max), sql.Named("max", partition.UpperLimit),
) )
} }
rowsRead := 0 rowsRead := 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...) rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...)
if err != nil { if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }

View File

@@ -9,18 +9,18 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresExtractor struct { type PostgresExtractor struct {
db dbwrapper.DbWrapper db *pgxpool.Pool
} }
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor { func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor {
return &PostgresExtractor{db: db} return &PostgresExtractor{db: pool}
} }
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string { func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
@@ -62,7 +62,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
) (int, error) { ) (int, error) {
query := buildExtractQueryPostgres(tableInfo, columns) query := buildExtractQueryPostgres(tableInfo, columns)
if partition.HasRange { if partition.ShouldUseRange {
return 0, errors.New("Batch config not yet supported") return 0, errors.New("Batch config not yet supported")
} }

View File

@@ -0,0 +1 @@
package extractors

View File

@@ -9,32 +9,43 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
) )
type GenericLoader struct { type PostgresLoader struct {
db dbwrapper.DbWrapper db *pgxpool.Pool
} }
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader { func NewPostgresLoader(pool *pgxpool.Pool) etl.Loader {
return &GenericLoader{db: db} return &PostgresLoader{db: pool}
} }
func (gl *GenericLoader) ProcessBatch( func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
_, err := gl.db.SaveMassive( tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table}
_, err := postgresLd.db.CopyFrom(
ctx, ctx,
tableInfo.Schema, tableId,
tableInfo.Table,
colNames, colNames,
batch.Rows, pgx.CopyFromRows(batch.Rows),
) )
if err != nil { if err != nil {
@@ -43,7 +54,7 @@ func (gl *GenericLoader) ProcessBatch(
if pgErr.Code == "23505" { if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{ return 0, &custom_errors.JobError{
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table), Msg: fmt.Sprintf("Fatal error in table %s", tableId.Sanitize()),
Prev: err, Prev: err,
} }
} }
@@ -55,7 +66,7 @@ func (gl *GenericLoader) ProcessBatch(
return len(batch.Rows), nil return len(batch.Rows), nil
} }
func (gl *GenericLoader) Exec( func (postgresLd *PostgresLoader) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -82,7 +93,7 @@ func (gl *GenericLoader) Exec(
return return
} }
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil { if err != nil {
var ldError *custom_errors.LoaderError var ldError *custom_errors.LoaderError

View File

@@ -0,0 +1 @@
package loaders

View File

@@ -1,11 +0,0 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -24,7 +24,7 @@ func PartitionRangeGenerator(
if rowsCount <= rowsPerPartition { if rowsCount <= rowsPerPartition {
return []models.Partition{{ return []models.Partition{{
Id: uuid.New(), Id: uuid.New(),
HasRange: false, ShouldUseRange: false,
RetryCounter: 0, RetryCounter: 0,
}}, nil }}, nil

View File

@@ -8,17 +8,16 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlTableAnalyzer struct { type MssqlTableAnalyzer struct {
db dbwrapper.DbWrapper db *sql.DB
} }
func NewMssqlTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer { func NewMssqlTableAnalyzer(db *sql.DB) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db} return &MssqlTableAnalyzer{db: db}
} }
@@ -36,11 +35,9 @@ JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%')) WHERE s.name = @schema AND st.name = @table AND c.name NOT LIKE 'graph_id%'
ORDER BY c.column_id;` ORDER BY c.column_id;`
// AND c.name NOT LIKE '$%'
type rawColumnMssql struct { type rawColumnMssql struct {
name string name string
userType string userType string
@@ -145,7 +142,7 @@ func (ta *MssqlTableAnalyzer) QueryColumnTypes(
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second) localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
rows, err := ta.db.Query(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)) rows, err := ta.db.QueryContext(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -186,11 +183,11 @@ JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1) WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name` GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel() defer cancel()
var rowsCount int64 var rowsCount int64
err := ta.db.QueryRow(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) err := ta.db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -218,10 +215,10 @@ ORDER BY batch_id`,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table) tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel() defer cancel()
rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) rows, err := ta.db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -232,14 +229,12 @@ ORDER BY batch_id`,
for rows.Next() { for rows.Next() {
partition := models.Partition{ partition := models.Partition{
Id: uuid.New(), Id: uuid.New(),
HasRange: true, ShouldUseRange: true,
RetryCounter: 0, RetryCounter: 0,
Range: models.PartitionRange{ IsLowerLimitInclusive: true,
IsMinInclusive: true,
},
} }
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil { if err := rows.Scan(&partition.LowerLimit, &partition.UpperLimit); err != nil {
return nil, err return nil, err
} }

View File

@@ -6,16 +6,16 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresTableAnalyzer struct { type PostgresTableAnalyzer struct {
db dbwrapper.DbWrapper db *pgxpool.Pool
} }
func NewPostgresTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer { func NewPostgresTableAnalyzer(db *pgxpool.Pool) etl.TableAnalyzer {
return &PostgresTableAnalyzer{db: db} return &PostgresTableAnalyzer{db: db}
} }
@@ -125,7 +125,7 @@ func (ta *PostgresTableAnalyzer) QueryColumnTypes(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,
) ([]models.ColumnType, error) { ) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table) rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table)

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"sync" "sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
@@ -17,7 +18,46 @@ func NewMssqlTransformer() etl.Transformer {
} }
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
return []etl.ColumnTransformPlan{} var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
} }
const processBatchCtxCheck = 4096 const processBatchCtxCheck = 4096

View File

@@ -1,10 +1,6 @@
package models package models
import ( import "github.com/google/uuid"
"time"
"github.com/google/uuid"
)
type UnknownRowValues = []any type UnknownRowValues = []any
@@ -15,27 +11,12 @@ type Batch struct {
RetryCounter int RetryCounter int
} }
type PartitionRange struct {
Min int64
Max int64
IsMinInclusive bool
IsMaxInclusive bool
}
type Partition struct { type Partition struct {
Id uuid.UUID Id uuid.UUID
ParentId uuid.UUID ParentId uuid.UUID
Range PartitionRange LowerLimit int64
HasRange bool UpperLimit int64
IsLowerLimitInclusive bool
ShouldUseRange bool
RetryCounter int RetryCounter int
} }
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -0,0 +1,58 @@
package main
import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"github.com/cenkalti/backoff/v5"
)
func ExampleRetry() {
// Define an operation function that returns a value and an error.
// The value can be any type.
// We'll pass this operation to Retry function.
operation := func() (string, error) {
// An example request that may fail.
resp, err := http.Get("http://httpbin.org/get")
if err != nil {
return "", err
}
defer resp.Body.Close()
// If we are being rate limited, return a RetryAfter to specify how long to wait.
// This will also reset the backoff policy.
if resp.StatusCode == 429 {
seconds, err := strconv.ParseInt(resp.Header.Get("Retry-After"), 10, 64)
if err == nil {
return "", backoff.RetryAfter(int(seconds))
}
}
// In case of non-retriable error, return Permanent error to stop retrying.
// For this HTTP example, client errors are non-retriable.
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
return "", backoff.Permanent(errors.New("bad request"))
}
// Return successful response.
return "hello", nil
}
result, err := backoff.Retry(context.TODO(), operation, backoff.WithBackOff(backoff.NewExponentialBackOff()))
if err != nil {
fmt.Println("Error:", err)
return
}
// Operation is successful after retries.
fmt.Println(result)
// Output: hello
}
func main() {
ExampleRetry()
}

View File

@@ -8,32 +8,13 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
func main() { func main() {
log.SetFormatter(&log.TextFormatter{ log.SetFormatter(&log.TextFormatter{
FullTimestamp: true, FullTimestamp: true,
@@ -46,8 +27,8 @@ func main() {
ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
sourcePool, err := Connect(ctxSource, config.App.SourceDbUrl) sourcePool, err := db.Connect(ctxSource, config.App.SourceDbUrl)
defer Close(sourcePool) defer db.Close(sourcePool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -56,8 +37,8 @@ func main() {
ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
targetPool, err := Connect(ctxTarget, config.App.TargetDbUrl) targetPool, err := db.Connect(ctxTarget, config.App.TargetDbUrl)
defer Close(targetPool) defer db.Close(targetPool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }