From 1bc7b676439633ff48560f1e61df1c32080631d9 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Sat, 9 May 2026 00:30:49 -0500 Subject: [PATCH] refactor: replace Exec with Consume method in MssqlTransformer; enhance retry handling and streamline transformation logic --- cmd/go_migrate/process.go | 3 +- internal/app/etl/transformers/consume.go | 82 +++++++ internal/app/etl/transformers/mssql.go | 204 ------------------ internal/app/etl/transformers/plan.go | 122 +++++++++++ .../etl/transformers/process-with-retries.go | 73 +++++++ internal/app/etl/types.go | 11 +- 6 files changed, 282 insertions(+), 213 deletions(-) create mode 100644 internal/app/etl/transformers/consume.go create mode 100644 internal/app/etl/transformers/plan.go create mode 100644 internal/app/etl/transformers/process-with-retries.go diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 5c48c87..57c40c9 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -161,9 +161,10 @@ func processMigrationJob( for range maxExtractors { wgTransformers.Go(func() { - transformer.Exec( + transformer.Consume( localCtx, sourceColTypes, + job.Retry, chBatchesRaw, chBatchesTransformed, chJobErrors, diff --git a/internal/app/etl/transformers/consume.go b/internal/app/etl/transformers/consume.go new file mode 100644 index 0000000..f8545a0 --- /dev/null +++ b/internal/app/etl/transformers/consume.go @@ -0,0 +1,82 @@ +package transformers + +import ( + "context" + "errors" + "sync" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +func (mssqlTr *MssqlTransformer) Consume( + ctx context.Context, + columns []models.ColumnType, + retryConfig config.RetryConfig, + chBatchesIn <-chan models.Batch, + chBatchesOut chan<- models.Batch, + chJobErrorsOut chan<- custom_errors.JobError, + wgActiveBatches *sync.WaitGroup, +) { + transformationPlan := computeTransformationPlan(columns) + storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable) + transformationPlan = append(transformationPlan, storagePlan...) + + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + + case batch, ok := <-chBatchesIn: + if !ok { + return + } + + if len(transformationPlan) == 0 { + select { + case chBatchesOut <- batch: + wgActiveBatches.Add(1) + continue + case <-ctx.Done(): + return + } + } + + err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig) + if err != nil { + if errors.Is(err, ctx.Err()) { + return + } + + if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { + select { + case chJobErrorsOut <- *jobError: + case <-ctx.Done(): + return + } + } else { + select { + case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: + case <-ctx.Done(): + return + } + } + + return + } + + select { + case chBatchesOut <- batch: + case <-ctx.Done(): + return + } + + wgActiveBatches.Add(1) + } + } +} diff --git a/internal/app/etl/transformers/mssql.go b/internal/app/etl/transformers/mssql.go index 0c238e5..da0710c 100644 --- a/internal/app/etl/transformers/mssql.go +++ b/internal/app/etl/transformers/mssql.go @@ -1,20 +1,9 @@ package transformers import ( - "context" - "errors" - "fmt" - "strings" - "sync" - "time" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" - log "github.com/sirupsen/logrus" ) type MssqlTransformer struct { @@ -30,196 +19,3 @@ func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.So azureClient: azureClient, } } - -func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { - var plan []etl.ColumnTransformPlan - - for i, col := range columns { - switch col.SystemType() { - case "uniqueidentifier": - plan = append(plan, etl.ColumnTransformPlan{ - Index: i, - Fn: func(v any) (any, error) { - if b, ok := v.([]byte); ok && b != nil { - return mssqlUuidToBigEndian(b) - } - return v, nil - }, - }) - - case "geometry", "geography": - plan = append(plan, etl.ColumnTransformPlan{ - Index: i, - Fn: func(v any) (any, error) { - if b, ok := v.([]byte); ok && b != nil { - return wkbToEwkbWithSrid(b, 4326) - } - return v, nil - }, - }) - - case "datetime", "datetime2": - plan = append(plan, etl.ColumnTransformPlan{ - Index: i, - Fn: func(v any) (any, error) { - if t, ok := v.(time.Time); ok { - return ensureUTC(t), nil - } - return v, nil - }, - }) - } - } - - return plan -} - -func computeStorageTransformationPlan( - ctx context.Context, - azureClient *azure.Client, - toStorage config.ToStorageConfig, - sourceColumns []models.ColumnType, - sourceTable config.SourceTableInfo, -) []etl.ColumnTransformPlan { - if azureClient == nil || len(toStorage.Columns) == 0 { - return nil - } - - colIndex := make(map[string]int, len(sourceColumns)) - for i, col := range sourceColumns { - colIndex[strings.ToUpper(col.Name())] = i - } - - var plan []etl.ColumnTransformPlan - for _, storageCol := range toStorage.Columns { - if storageCol.Mode != "REFERENCE_ONLY" { - log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source) - continue - } - - idx, ok := colIndex[strings.ToUpper(storageCol.Source)] - if !ok { - log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source) - continue - } - - sourceColName := storageCol.Source - schema := sourceTable.Schema - table := sourceTable.Table - - plan = append(plan, etl.ColumnTransformPlan{ - Index: idx, - Fn: func(v any) (any, error) { - if v == nil { - return nil, nil - } - b, ok := v.([]byte) - if !ok { - log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through", - schema, table, sourceColName, v) - return v, nil - } - start := time.Now() - blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String()) - blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b) - if err != nil { - return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err) - } - log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds()) - return blobURL, nil - }, - }) - } - return plan -} - -const processBatchCtxCheck = 4096 - -func (mssqlTr *MssqlTransformer) ProcessBatch( - ctx context.Context, - batch *models.Batch, - transformationPlan []etl.ColumnTransformPlan, -) error { - for i, rowValues := range batch.Rows { - if i%processBatchCtxCheck == 0 { - if err := ctx.Err(); err != nil { - return err - } - } - - for _, task := range transformationPlan { - val := rowValues[task.Index] - if val == nil { - continue - } - - transformed, err := task.Fn(val) - if err != nil { - return err - } - rowValues[task.Index] = transformed - } - } - - return nil -} - -func (mssqlTr *MssqlTransformer) Exec( - ctx context.Context, - columns []models.ColumnType, - chBatchesIn <-chan models.Batch, - chBatchesOut chan<- models.Batch, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveBatches *sync.WaitGroup, -) { - transformationPlan := computeTransformationPlan(columns) - storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable) - transformationPlan = append(transformationPlan, storagePlan...) - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - - case batch, ok := <-chBatchesIn: - if !ok { - return - } - - if len(transformationPlan) == 0 { - select { - case chBatchesOut <- batch: - wgActiveBatches.Add(1) - continue - case <-ctx.Done(): - return - } - } - - err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan) - if err != nil { - if errors.Is(err, ctx.Err()) { - return - } - - select { - case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: - case <-ctx.Done(): - } - return - } - - select { - case chBatchesOut <- batch: - case <-ctx.Done(): - return - } - - wgActiveBatches.Add(1) - } - } -} diff --git a/internal/app/etl/transformers/plan.go b/internal/app/etl/transformers/plan.go new file mode 100644 index 0000000..435bd46 --- /dev/null +++ b/internal/app/etl/transformers/plan.go @@ -0,0 +1,122 @@ +package transformers + +import ( + "context" + "fmt" + "strings" + "time" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" + "github.com/sirupsen/logrus" +) + +func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { + var plan []etl.ColumnTransformPlan + + for i, col := range columns { + switch col.SystemType() { + case "uniqueidentifier": + plan = append(plan, etl.ColumnTransformPlan{ + Index: i, + Fn: func(v any) (any, error) { + if b, ok := v.([]byte); ok && b != nil { + return mssqlUuidToBigEndian(b) + } + return v, nil + }, + }) + + case "geometry", "geography": + plan = append(plan, etl.ColumnTransformPlan{ + Index: i, + Fn: func(v any) (any, error) { + if b, ok := v.([]byte); ok && b != nil { + return wkbToEwkbWithSrid(b, 4326) + } + return v, nil + }, + }) + + case "datetime", "datetime2": + plan = append(plan, etl.ColumnTransformPlan{ + Index: i, + Fn: func(v any) (any, error) { + if t, ok := v.(time.Time); ok { + return ensureUTC(t), nil + } + return v, nil + }, + }) + } + } + + return plan +} + +func computeStorageTransformationPlan( + ctx context.Context, + azureClient *azure.Client, + toStorage config.ToStorageConfig, + sourceColumns []models.ColumnType, + sourceTable config.SourceTableInfo, +) []etl.ColumnTransformPlan { + if azureClient == nil || len(toStorage.Columns) == 0 { + return nil + } + + colIndex := make(map[string]int, len(sourceColumns)) + for i, col := range sourceColumns { + colIndex[strings.ToUpper(col.Name())] = i + } + + var plan []etl.ColumnTransformPlan + for _, storageCol := range toStorage.Columns { + if storageCol.Mode != "REFERENCE_ONLY" { + logrus.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source) + continue + } + + idx, ok := colIndex[strings.ToUpper(storageCol.Source)] + if !ok { + logrus.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source) + continue + } + + sourceColName := storageCol.Source + schema := sourceTable.Schema + table := sourceTable.Table + + plan = append(plan, etl.ColumnTransformPlan{ + Index: idx, + Fn: func(v any) (any, error) { + if v == nil { + return nil, nil + } + b, ok := v.([]byte) + if !ok { + logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through", + schema, table, sourceColName, v) + return v, nil + } + start := time.Now() + blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String()) + blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b) + if err != nil { + return nil, &custom_errors.JobError{ + Msg: fmt.Sprintf("Error uploading %s.%s.%s", schema, table, sourceColName), + Prev: err, + } + } + + logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds()) + return blobURL, nil + }, + }) + } + return plan +} diff --git a/internal/app/etl/transformers/process-with-retries.go b/internal/app/etl/transformers/process-with-retries.go new file mode 100644 index 0000000..dbf42ef --- /dev/null +++ b/internal/app/etl/transformers/process-with-retries.go @@ -0,0 +1,73 @@ +package transformers + +import ( + "context" + "errors" + "time" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +const processBatchCtxCheck = 4096 + +func ProcessBatchWithRetries( + ctx context.Context, + batch *models.Batch, + transformationPlan []etl.ColumnTransformPlan, + retryConfig config.RetryConfig, +) error { + for i, rowValues := range batch.Rows { + if i%processBatchCtxCheck == 0 { + if err := ctx.Err(); err != nil { + return err + } + } + + for _, task := range transformationPlan { + val := rowValues[task.Index] + if val == nil { + continue + } + + var lastErr error + success := false + + for attempt := 0; attempt < retryConfig.Attempts; attempt++ { + transformed, err := task.Fn(val) + if err == nil { + rowValues[task.Index] = transformed + success = true + break + } + + lastErr = err + if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { + if jobError.ShouldCancelJob { + return jobError + } + } + + if attempt == retryConfig.Attempts-1 { + break + } + + delay := custom_errors.ComputeBackoffDelay( + attempt, + retryConfig.BaseDelayMs, + retryConfig.MaxDelayMs, + retryConfig.MaxJitterMs, + ) + time.Sleep(delay) + } + + if !success { + return lastErr + } + } + } + + return nil +} diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index cb1bc1a..1322097 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -29,17 +29,12 @@ type ColumnTransformPlan struct { } type Transformer interface { - ProcessBatch( - ctx context.Context, - batch *models.Batch, - transformationPlan []ColumnTransformPlan, - ) error - - Exec( + Consume( ctx context.Context, columns []models.ColumnType, + retryConfig config.RetryConfig, chBatchesIn <-chan models.Batch, - chBactchesOut chan<- models.Batch, + chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, )