From cd0e53b1d2ef5e6801b5c237eb1b5d9fa68be18d Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Fri, 10 Apr 2026 23:39:37 -0500 Subject: [PATCH] feat: implement MSSQL extractor, transformer, and Postgres loader for enhanced data migration --- cmd/go_migrate/main.go | 18 ++++- cmd/go_migrate/process.go | 18 ++--- internal/app/etl/extractor/types.go | 36 --------- .../etl/{extractor => extractors}/mssql.go | 5 +- .../etl/{extractor => extractors}/postgres.go | 5 +- internal/app/etl/extractors/types.go | 1 + internal/app/etl/loader/types.go | 30 ------- .../app/etl/{loader => loaders}/postgres.go | 5 +- internal/app/etl/loaders/types.go | 1 + internal/app/etl/transformer/types.go | 33 -------- .../{transformer => transformers}/mssql.go | 35 ++++---- internal/app/etl/transformers/types.go | 1 + .../{transformer => transformers}/utils.go | 2 +- internal/app/etl/types.go | 80 +++++++++++++++++++ 14 files changed, 135 insertions(+), 135 deletions(-) delete mode 100644 internal/app/etl/extractor/types.go rename internal/app/etl/{extractor => extractors}/mssql.go (97%) rename internal/app/etl/{extractor => extractors}/postgres.go (95%) create mode 100644 internal/app/etl/extractors/types.go delete mode 100644 internal/app/etl/loader/types.go rename internal/app/etl/{loader => loaders}/postgres.go (94%) create mode 100644 internal/app/etl/loaders/types.go delete mode 100644 internal/app/etl/transformer/types.go rename internal/app/etl/{transformer => transformers}/mssql.go (76%) create mode 100644 internal/app/etl/transformers/types.go rename internal/app/etl/{transformer => transformers}/utils.go (98%) create mode 100644 internal/app/etl/types.go diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 337e0b3..6afdf01 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -7,6 +7,9 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" ) @@ -87,11 +90,24 @@ func processMigrationJobs( chJobs := make(chan config.Job, len(jobs)) var wgJobs sync.WaitGroup + extractor := extractors.NewMssqlExtractor(sourceDb) + transformer := transformers.NewMssqlTransformer() + loader := loaders.NewPostgresLoader(targetDb) + for i := range maxParallelWorkers { wgJobs.Go(func() { for job := range chJobs { log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) - res := processMigrationJob(ctx, sourceDb, targetDb, job) + res := processMigrationJob( + ctx, + sourceDb, + targetDb, + extractor, + transformer, + loader, + job, + ) + chJobResults <- res } }) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 2599d8d..1641eeb 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -9,12 +9,9 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loader" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformer" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgxpool" - _ "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" ) @@ -23,6 +20,9 @@ func processMigrationJob( ctx context.Context, sourceDb *sql.DB, targetDb *pgxpool.Pool, + extractor etl.Extractor, + transformer etl.Transformer, + loader etl.Loader, job config.Job, ) JobResult { result := JobResult{ @@ -62,10 +62,6 @@ func processMigrationJob( var wgTransformers sync.WaitGroup var wgLoaders sync.WaitGroup - mssqlExtractor := extractor.NewMssqlExtractor(sourceDb) - mssqlToPostgresTransformer := transformer.NewMssqlTransformer() - postgresLoader := loader.NewPostgresLoader(targetDb) - go func() { if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil { cancel() @@ -81,7 +77,7 @@ func processMigrationJob( for range maxExtractors { wgExtractors.Go(func() { - mssqlExtractor.Exec( + extractor.Exec( jobCtx, job.SourceTable, sourceColTypes, @@ -107,7 +103,7 @@ func processMigrationJob( for range maxExtractors { wgTransformers.Go(func() { - mssqlToPostgresTransformer.Exec( + transformer.Exec( jobCtx, sourceColTypes, chChunksRaw, @@ -122,7 +118,7 @@ func processMigrationJob( for range job.MaxLoaders { wgLoaders.Go(func() { - postgresLoader.Exec( + loader.Exec( jobCtx, job.TargetTable, targetColTypes, diff --git a/internal/app/etl/extractor/types.go b/internal/app/etl/extractor/types.go deleted file mode 100644 index 0f85cf7..0000000 --- a/internal/app/etl/extractor/types.go +++ /dev/null @@ -1,36 +0,0 @@ -package extractor - -import ( - "context" - "sync" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" -) - -type Extractor interface { - ProcessBatch( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - chunkSize int, - batch models.Batch, - indexPrimaryKey int, - chChunksOut chan<- models.Chunk, - rowsRead *int64, - ) error - - Exec( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - chunkSize int, - chBatchesIn <-chan models.Batch, - chChunksOut chan<- models.Chunk, - chErrorsOut chan<- custom_errors.ExtractorError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveBatches *sync.WaitGroup, - rowsRead *int64, - ) -} diff --git a/internal/app/etl/extractor/mssql.go b/internal/app/etl/extractors/mssql.go similarity index 97% rename from internal/app/etl/extractor/mssql.go rename to internal/app/etl/extractors/mssql.go index 662171c..1e94a95 100644 --- a/internal/app/etl/extractor/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -1,4 +1,4 @@ -package extractor +package extractors import ( "context" @@ -13,6 +13,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) @@ -21,7 +22,7 @@ type MssqlExtractor struct { db *sql.DB } -func NewMssqlExtractor(db *sql.DB) Extractor { +func NewMssqlExtractor(db *sql.DB) etl.Extractor { return &MssqlExtractor{db: db} } diff --git a/internal/app/etl/extractor/postgres.go b/internal/app/etl/extractors/postgres.go similarity index 95% rename from internal/app/etl/extractor/postgres.go rename to internal/app/etl/extractors/postgres.go index 489da10..ece399a 100644 --- a/internal/app/etl/extractor/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -1,4 +1,4 @@ -package extractor +package extractors import ( "context" @@ -10,6 +10,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" @@ -19,7 +20,7 @@ type PostgresExtractor struct { db *pgxpool.Pool } -func NewPostgresExtractor(pool *pgxpool.Pool) Extractor { +func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor { return &PostgresExtractor{db: pool} } diff --git a/internal/app/etl/extractors/types.go b/internal/app/etl/extractors/types.go new file mode 100644 index 0000000..85defa0 --- /dev/null +++ b/internal/app/etl/extractors/types.go @@ -0,0 +1 @@ +package extractors diff --git a/internal/app/etl/loader/types.go b/internal/app/etl/loader/types.go deleted file mode 100644 index d2bff17..0000000 --- a/internal/app/etl/loader/types.go +++ /dev/null @@ -1,30 +0,0 @@ -package loader - -import ( - "context" - "sync" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" -) - -type Loader interface { - ProcessChunk( - ctx context.Context, - tableInfo config.TargetTableInfo, - colNames []string, - chunk models.Chunk, - ) (int, error) - - Exec( - ctx context.Context, - tableInfo config.TargetTableInfo, - columns []models.ColumnType, - chChunksIn <-chan models.Chunk, - chErrorsOut chan<- custom_errors.LoaderError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, - rowsLoaded *int64, - ) -} diff --git a/internal/app/etl/loader/postgres.go b/internal/app/etl/loaders/postgres.go similarity index 94% rename from internal/app/etl/loader/postgres.go rename to internal/app/etl/loaders/postgres.go index 4cdc1ac..71c8a47 100644 --- a/internal/app/etl/loader/postgres.go +++ b/internal/app/etl/loaders/postgres.go @@ -1,4 +1,4 @@ -package loader +package loaders import ( "context" @@ -9,6 +9,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" @@ -19,7 +20,7 @@ type PostgresLoader struct { db *pgxpool.Pool } -func NewPostgresLoader(pool *pgxpool.Pool) Loader { +func NewPostgresLoader(pool *pgxpool.Pool) etl.Loader { return &PostgresLoader{db: pool} } diff --git a/internal/app/etl/loaders/types.go b/internal/app/etl/loaders/types.go new file mode 100644 index 0000000..c88d5fb --- /dev/null +++ b/internal/app/etl/loaders/types.go @@ -0,0 +1 @@ +package loaders diff --git a/internal/app/etl/transformer/types.go b/internal/app/etl/transformer/types.go deleted file mode 100644 index 451ce4c..0000000 --- a/internal/app/etl/transformer/types.go +++ /dev/null @@ -1,33 +0,0 @@ -package transformer - -import ( - "context" - "sync" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" -) - -type transformerFunc func(any) (any, error) - -type columnTransformPlan struct { - index int - fn transformerFunc -} - -type Transformer interface { - ProcessChunk( - ctx context.Context, - chunk *models.Chunk, - transformationPlan []columnTransformPlan, - ) error - - Exec( - ctx context.Context, - columns []models.ColumnType, - chChunksIn <-chan models.Chunk, - chChunksOut chan<- models.Chunk, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, - ) -} diff --git a/internal/app/etl/transformer/mssql.go b/internal/app/etl/transformers/mssql.go similarity index 76% rename from internal/app/etl/transformer/mssql.go rename to internal/app/etl/transformers/mssql.go index 213e6f0..75ffb4f 100644 --- a/internal/app/etl/transformer/mssql.go +++ b/internal/app/etl/transformers/mssql.go @@ -1,4 +1,4 @@ -package transformer +package transformers import ( "context" @@ -7,24 +7,25 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) type MssqlTransformer struct{} -func NewMssqlTransformer() Transformer { +func NewMssqlTransformer() etl.Transformer { return &MssqlTransformer{} } -func computeTransformationPlan(columns []models.ColumnType) []columnTransformPlan { - var plan []columnTransformPlan +func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { + var plan []etl.ColumnTransformPlan for i, col := range columns { switch col.SystemType() { case "uniqueidentifier": - plan = append(plan, columnTransformPlan{ - index: i, - fn: func(v any) (any, error) { + plan = append(plan, etl.ColumnTransformPlan{ + Index: i, + Fn: func(v any) (any, error) { if b, ok := v.([]byte); ok && b != nil { return mssqlUuidToBigEndian(b) } @@ -33,9 +34,9 @@ func computeTransformationPlan(columns []models.ColumnType) []columnTransformPla }) case "geometry", "geography": - plan = append(plan, columnTransformPlan{ - index: i, - fn: func(v any) (any, error) { + plan = append(plan, etl.ColumnTransformPlan{ + Index: i, + Fn: func(v any) (any, error) { if b, ok := v.([]byte); ok && b != nil { return wkbToEwkbWithSrid(b, 4326) } @@ -44,9 +45,9 @@ func computeTransformationPlan(columns []models.ColumnType) []columnTransformPla }) case "datetime", "datetime2": - plan = append(plan, columnTransformPlan{ - index: i, - fn: func(v any) (any, error) { + plan = append(plan, etl.ColumnTransformPlan{ + Index: i, + Fn: func(v any) (any, error) { if t, ok := v.(time.Time); ok { return ensureUTC(t), nil } @@ -64,7 +65,7 @@ const processChunkCtxCheck = 4096 func (mssqlTr *MssqlTransformer) ProcessChunk( ctx context.Context, chunk *models.Chunk, - transformationPlan []columnTransformPlan, + transformationPlan []etl.ColumnTransformPlan, ) error { for i, rowValues := range chunk.Data { if i%processChunkCtxCheck == 0 { @@ -74,16 +75,16 @@ func (mssqlTr *MssqlTransformer) ProcessChunk( } for _, task := range transformationPlan { - val := rowValues[task.index] + val := rowValues[task.Index] if val == nil { continue } - transformed, err := task.fn(val) + transformed, err := task.Fn(val) if err != nil { return err } - rowValues[task.index] = transformed + rowValues[task.Index] = transformed } } diff --git a/internal/app/etl/transformers/types.go b/internal/app/etl/transformers/types.go new file mode 100644 index 0000000..b3fa142 --- /dev/null +++ b/internal/app/etl/transformers/types.go @@ -0,0 +1 @@ +package transformers diff --git a/internal/app/etl/transformer/utils.go b/internal/app/etl/transformers/utils.go similarity index 98% rename from internal/app/etl/transformer/utils.go rename to internal/app/etl/transformers/utils.go index b6a9653..00b3939 100644 --- a/internal/app/etl/transformer/utils.go +++ b/internal/app/etl/transformers/utils.go @@ -1,4 +1,4 @@ -package transformer +package transformers import ( "encoding/binary" diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go new file mode 100644 index 0000000..acfc0cf --- /dev/null +++ b/internal/app/etl/types.go @@ -0,0 +1,80 @@ +package etl + +import ( + "context" + "sync" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +type Extractor interface { + ProcessBatch( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + chunkSize int, + batch models.Batch, + indexPrimaryKey int, + chChunksOut chan<- models.Chunk, + rowsRead *int64, + ) error + + Exec( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + chunkSize int, + chBatchesIn <-chan models.Batch, + chChunksOut chan<- models.Chunk, + chErrorsOut chan<- custom_errors.ExtractorError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveBatches *sync.WaitGroup, + rowsRead *int64, + ) +} + +type TransformerFunc func(any) (any, error) + +type ColumnTransformPlan struct { + Index int + Fn TransformerFunc +} + +type Transformer interface { + ProcessChunk( + ctx context.Context, + chunk *models.Chunk, + transformationPlan []ColumnTransformPlan, + ) error + + Exec( + ctx context.Context, + columns []models.ColumnType, + chChunksIn <-chan models.Chunk, + chChunksOut chan<- models.Chunk, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveChunks *sync.WaitGroup, + ) +} + +type Loader interface { + ProcessChunk( + ctx context.Context, + tableInfo config.TargetTableInfo, + colNames []string, + chunk models.Chunk, + ) (int, error) + + Exec( + ctx context.Context, + tableInfo config.TargetTableInfo, + columns []models.ColumnType, + chChunksIn <-chan models.Chunk, + chErrorsOut chan<- custom_errors.LoaderError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveChunks *sync.WaitGroup, + rowsLoaded *int64, + ) +}