diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 0a7b800..f22f186 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -10,6 +10,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformer" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgxpool" @@ -60,6 +61,9 @@ func processMigrationJob( var wgTransformers sync.WaitGroup var wgLoaders sync.WaitGroup + mssqlExtractor := extractor.NewMssqlExtractor(sourceDb) + mssqlToPostgresTransformer := transformer.NewMssqlTransformer() + go func() { if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil { cancel() @@ -73,11 +77,9 @@ func processMigrationJob( maxExtractors := min(job.MaxExtractors, len(batches)) log.Infof("Starting %d extractor(s)...", maxExtractors) - exMssql := extractor.NewMssqlExtractor(sourceDb) - for range maxExtractors { wgExtractors.Go(func() { - exMssql.Exec( + mssqlExtractor.Exec( jobCtx, job.SourceTable, sourceColTypes, @@ -103,7 +105,14 @@ func processMigrationJob( for range maxExtractors { wgTransformers.Go(func() { - transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks) + mssqlToPostgresTransformer.Exec( + jobCtx, + sourceColTypes, + chChunksRaw, + chChunksTransformed, + chJobErrors, + &wgActiveChunks, + ) }) } diff --git a/internal/app/etl/extractor/main.go b/internal/app/etl/extractor/types.go similarity index 100% rename from internal/app/etl/extractor/main.go rename to internal/app/etl/extractor/types.go diff --git a/cmd/go_migrate/transformer.go b/internal/app/etl/transformer/mssql.go similarity index 83% rename from cmd/go_migrate/transformer.go rename to internal/app/etl/transformer/mssql.go index 2ceba6f..ef6dc0a 100644 --- a/cmd/go_migrate/transformer.go +++ b/internal/app/etl/transformer/mssql.go @@ -1,4 +1,4 @@ -package main +package transformer import ( "context" @@ -8,76 +8,12 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - log "github.com/sirupsen/logrus" ) -type transformerFunc func(any) (any, error) +type MssqlTransformer struct{} -type columnTransformPlan struct { - index int - fn transformerFunc -} - -func transformRowsMssql( - ctx context.Context, - columns []models.ColumnType, - chChunksIn <-chan models.Chunk, - chChunksOut chan<- models.Chunk, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, -) { - transformationPlan := computeTransformationPlan(columns) - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - - case chunk, ok := <-chChunksIn: - if !ok { - return - } - - if len(transformationPlan) == 0 { - select { - case chChunksOut <- chunk: - wgActiveChunks.Add(1) - continue - case <-ctx.Done(): - return - } - } - - chunkStartTime := time.Now() - - err := processChunk(ctx, &chunk, transformationPlan) - if err != nil { - if errors.Is(err, ctx.Err()) { - return - } - - select { - case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: - case <-ctx.Done(): - } - return - } - - log.Infof("Transformed chunk %s: %d rows in %v", chunk.Id, len(chunk.Data), time.Since(chunkStartTime)) - - select { - case chChunksOut <- chunk: - case <-ctx.Done(): - return - } - - wgActiveChunks.Add(1) - } - } +func NewMssqlTransformer() *MssqlTransformer { + return &MssqlTransformer{} } func computeTransformationPlan(columns []models.ColumnType) []columnTransformPlan { @@ -125,7 +61,11 @@ func computeTransformationPlan(columns []models.ColumnType) []columnTransformPla const processChunkCtxCheck = 4096 -func processChunk(ctx context.Context, chunk *models.Chunk, transformationPlan []columnTransformPlan) error { +func (mssqlTr *MssqlTransformer) ProcessChunk( + ctx context.Context, + chunk *models.Chunk, + transformationPlan []columnTransformPlan, +) error { for i, rowValues := range chunk.Data { if i%processChunkCtxCheck == 0 { if err := ctx.Err(); err != nil { @@ -149,3 +89,61 @@ func processChunk(ctx context.Context, chunk *models.Chunk, transformationPlan [ return nil } + +func (mssqlTr *MssqlTransformer) Exec( + ctx context.Context, + columns []models.ColumnType, + chChunksIn <-chan models.Chunk, + chChunksOut chan<- models.Chunk, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveChunks *sync.WaitGroup, +) { + transformationPlan := computeTransformationPlan(columns) + + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + + case chunk, ok := <-chChunksIn: + if !ok { + return + } + + if len(transformationPlan) == 0 { + select { + case chChunksOut <- chunk: + wgActiveChunks.Add(1) + continue + case <-ctx.Done(): + return + } + } + + err := mssqlTr.ProcessChunk(ctx, &chunk, transformationPlan) + if err != nil { + if errors.Is(err, ctx.Err()) { + return + } + + select { + case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: + case <-ctx.Done(): + } + return + } + + select { + case chChunksOut <- chunk: + case <-ctx.Done(): + return + } + + wgActiveChunks.Add(1) + } + } +} diff --git a/internal/app/etl/transformer/types.go b/internal/app/etl/transformer/types.go new file mode 100644 index 0000000..451ce4c --- /dev/null +++ b/internal/app/etl/transformer/types.go @@ -0,0 +1,33 @@ +package transformer + +import ( + "context" + "sync" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +type transformerFunc func(any) (any, error) + +type columnTransformPlan struct { + index int + fn transformerFunc +} + +type Transformer interface { + ProcessChunk( + ctx context.Context, + chunk *models.Chunk, + transformationPlan []columnTransformPlan, + ) error + + Exec( + ctx context.Context, + columns []models.ColumnType, + chChunksIn <-chan models.Chunk, + chChunksOut chan<- models.Chunk, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveChunks *sync.WaitGroup, + ) +} diff --git a/cmd/go_migrate/mssql-transform.go b/internal/app/etl/transformer/utils.go similarity index 98% rename from cmd/go_migrate/mssql-transform.go rename to internal/app/etl/transformer/utils.go index a5ae098..b6a9653 100644 --- a/cmd/go_migrate/mssql-transform.go +++ b/internal/app/etl/transformer/utils.go @@ -1,4 +1,4 @@ -package main +package transformer import ( "encoding/binary"