From b690e580c56c8d71c1b6973980515a9475c44101 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Sat, 9 May 2026 01:16:34 -0500 Subject: [PATCH] refactor: enhance logging and batch processing in migration; adjust configuration parameters for improved performance --- cmd/go_migrate/main.go | 2 +- cmd/go_migrate/process.go | 36 ++++--- config.yaml | 68 ++++++------ internal/app/db-wrapper/mssql.go | 5 +- internal/app/etl/loaders/consume.go | 127 ++++++++++++++++++----- internal/app/etl/transformers/consume.go | 93 +++++++++++------ internal/app/etl/types.go | 1 + internal/app/models/main.go | 9 +- 8 files changed, 229 insertions(+), 112 deletions(-) diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 6b1c61e..7c6764c 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -86,7 +86,7 @@ func main() { log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed) totalDuration := time.Since(startTime) - log.Infof("=== Migration completed successfully! ===") + // log.Infof("=== Migration completed successfully! ===") log.Infof("Total migration time: %v", totalDuration) } diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 57c40c9..efd4a9c 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -129,7 +129,7 @@ func processMigrationJob( }() maxExtractors := min(job.MaxExtractors, len(partitions)) - log.Infof("Starting %d extractor(s)...", maxExtractors) + log.Infof("Starting %d extractor(s)... (%v)", maxExtractors, job.Name) for range maxExtractors { wgExtractors.Go(func() { @@ -157,7 +157,7 @@ func processMigrationJob( } }() - log.Infof("Starting %d transformer(s)...", maxExtractors) + log.Infof("Starting %d transformer(s)... (%v)", maxExtractors, job.Name) for range maxExtractors { wgTransformers.Go(func() { @@ -165,6 +165,7 @@ func processMigrationJob( localCtx, sourceColTypes, job.Retry, + job.TransformerBatchSize, chBatchesRaw, chBatchesTransformed, chJobErrors, @@ -173,7 +174,7 @@ func processMigrationJob( }) } - log.Infof("Starting %d loader(s)...", job.MaxLoaders) + log.Infof("Starting %d loader(s)... (%v)", job.MaxLoaders, job.Name) for range job.MaxLoaders { wgLoaders.Go(func() { @@ -182,6 +183,7 @@ func processMigrationJob( job.TargetTable, targetColTypes, job.Retry, + job.LoaderBatchSize, chBatchesTransformed, chJobErrors, &wgActiveBatches, @@ -192,28 +194,28 @@ func processMigrationJob( } go func() { - log.Debugf("Waiting for goroutines (%v)", job.Name) + // log.Debugf("Waiting for goroutines (%v)", job.Name) wgActivePartitions.Wait() - log.Debugf("wgActivePartitions is empty (%v)", job.Name) + // log.Debugf("wgActivePartitions is empty (%v)", job.Name) close(chPartitions) - log.Debugf("chPartitions is closed (%v)", job.Name) + // log.Debugf("chPartitions is closed (%v)", job.Name) wgExtractors.Wait() - log.Debugf("wgExtractors is empty (%v)", job.Name) + // log.Debugf("wgExtractors is empty (%v)", job.Name) close(chBatchesRaw) - log.Debugf("chBatchesRaw is closed (%v)", job.Name) + // log.Debugf("chBatchesRaw is closed (%v)", job.Name) wgTransformers.Wait() - log.Debugf("wgTransformers is empty (%v)", job.Name) + // log.Debugf("wgTransformers is empty (%v)", job.Name) + close(chBatchesTransformed) + // log.Debugf("chBatchesTransformed is closed (%v)", job.Name) wgActiveBatches.Wait() - log.Debugf("wgActiveBatches is empty (%v)", job.Name) - close(chBatchesTransformed) - log.Debugf("chBatchesTransformed is empty (%v)", job.Name) + // log.Debugf("wgActiveBatches is empty (%v)", job.Name) wgLoaders.Wait() - log.Debugf("wgLoaders is empty (%v)", job.Name) + // log.Debugf("wgLoaders is empty (%v)", job.Name) cancel() }() @@ -225,9 +227,9 @@ func processMigrationJob( } } - log.Debugf("waiting for local context to be done (%v)", job.Name) + // log.Debugf("waiting for local context to be done (%v)", job.Name) <-localCtx.Done() - log.Debugf("local context done (%v)", job.Name) + // log.Debugf("local context done (%v)", job.Name) if ctx.Err() != nil { result.Error = ctx.Err() @@ -242,5 +244,9 @@ func processMigrationJob( result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed) } + if result.RowsRead == 0 { + log.Warnf("No rows extracted from (%v)", job.Name) + } + return result } diff --git a/config.yaml b/config.yaml index bddf736..8d57bfa 100644 --- a/config.yaml +++ b/config.yaml @@ -3,12 +3,12 @@ source_db_type: sqlserver target_db_type: postgres defaults: - batches_per_partition: 8 + batches_per_partition: 4 max_extractors: 2 - extractor_batch_size: 25000 + extractor_batch_size: 5000 extractor_queue_size: 8 max_transformers: 2 - transformer_batch_size: 25000 + transformer_batch_size: 12500 transformer_queue_size: 8 max_loaders: 4 loader_batch_size: 25000 @@ -34,11 +34,11 @@ jobs: table: MANZANA pre_sql: - 'SELECT 1' - # range: - # min: 1000000 - # max: 2000000 - # is_min_inclusive: false - # is_max_inclusive: true + range: + min: 1000000 + max: 2000000 + is_min_inclusive: false + is_max_inclusive: true - name: red_puerto enabled: true @@ -57,29 +57,29 @@ jobs: post_sql: - "SELECT 1" - # - name: infraestructura_site_holder__attach - # source: - # schema: Infraestructura - # table: SITE_HOLDER__ATTACH - # primary_key: GDB_ARCHIVE_OID - # target: - # schema: Infraestructura - # table: SITE_HOLDER__ATTACH - # to_storage: - # columns: - # - source: DATA - # target: FILE_URL - # mode: REFERENCE_ONLY - # batches_per_partition: 10000 - # max_extractors: 8 - # extractor_queue_size: 32 - # extractor_batch_size: 1 - # max_transformers: 16 - # transformer_batch_size: 20000 - # transformer_queue_size: 8 - # max_loaders: 4 - # retry: - # attempts: 5 - # base_delay_ms: 1000 - # max_delay_ms: 15000 - # max_jitter_ms: 500 + - name: infraestructura_site_holder__attach + source: + schema: Infraestructura + table: SITE_HOLDER__ATTACH + primary_key: GDB_ARCHIVE_OID + target: + schema: Infraestructura + table: SITE_HOLDER__ATTACH + to_storage: + columns: + - source: DATA + target: FILE_URL + mode: REFERENCE_ONLY + batches_per_partition: 10000 + max_extractors: 8 + extractor_queue_size: 32 + extractor_batch_size: 1 + max_transformers: 16 + transformer_batch_size: 20000 + transformer_queue_size: 8 + max_loaders: 4 + retry: + attempts: 5 + base_delay_ms: 1000 + max_delay_ms: 15000 + max_jitter_ms: 500 diff --git a/internal/app/db-wrapper/mssql.go b/internal/app/db-wrapper/mssql.go index e3b2090..0fcd972 100644 --- a/internal/app/db-wrapper/mssql.go +++ b/internal/app/db-wrapper/mssql.go @@ -10,7 +10,6 @@ import ( dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" mssql "github.com/microsoft/go-mssqldb" - "github.com/sirupsen/logrus" ) func init() { @@ -188,8 +187,6 @@ func buildExtractQueryMssql(q ExtractionQuery) (string, error) { hasRegularColumns := len(q.Columns) > 0 hasJsonColumns := len(q.FromJsonColumns) > 0 - // logrus.Debugf("Extraction query: %+v", q) - resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns)) if hasJsonColumns { for _, jsonConfig := range q.FromJsonColumns { @@ -296,7 +293,7 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery return nil, err } - logrus.Debugf("Query: %s", queryString) + // logrus.Debugf("Query: %s", queryString) var queryArgs []any diff --git a/internal/app/etl/loaders/consume.go b/internal/app/etl/loaders/consume.go index 9b8f19b..1851f76 100644 --- a/internal/app/etl/loaders/consume.go +++ b/internal/app/etl/loaders/consume.go @@ -9,6 +9,8 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" + "github.com/sirupsen/logrus" ) func (gl *GenericLoader) Consume( @@ -16,6 +18,7 @@ func (gl *GenericLoader) Consume( tableInfo config.TargetTableInfo, columns []models.ColumnType, retryConfig config.RetryConfig, + batchSize int, chBatchesIn <-chan models.Batch, chErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, @@ -26,6 +29,66 @@ func (gl *GenericLoader) Consume( return col.Name() }) + var accRows []models.UnknownRowValues + var parentBatchesId []uuid.UUID + pendingDone := 0 + + defer func() { + for range pendingDone { + wgActiveBatches.Done() + } + }() + + flush := func() bool { + if len(accRows) == 0 { + return true + } + count := len(parentBatchesId) + superBatch := models.Batch{ + Id: uuid.New(), + ParentBatchesId: parentBatchesId, + Rows: accRows, + } + processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, superBatch) + for range count { + wgActiveBatches.Done() + } + pendingDone -= count + accRows = nil + parentBatchesId = nil + + if err != nil { + atomic.AddInt32(failedBatchesCount, 1) + if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { + select { + case <-ctx.Done(): + return false + case chErrorsOut <- *jobError: + } + } else { + select { + case <-ctx.Done(): + return false + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: + } + } + + if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) { + select { + case <-ctx.Done(): + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}: + } + return false + } + return true + } + + current := atomic.LoadInt64(rowsLoaded) + logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table) + atomic.AddInt64(rowsLoaded, int64(processedRows)) + return true + } + for { if ctx.Err() != nil { return @@ -36,42 +99,56 @@ func (gl *GenericLoader) Consume( return case batch, ok := <-chBatchesIn: if !ok { + flush() return } - processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch) - wgActiveBatches.Done() + if batchSize <= 0 { + processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch) + wgActiveBatches.Done() - if err != nil { - atomic.AddInt32(failedBatchesCount, 1) - if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { - select { - case <-ctx.Done(): - return - case chErrorsOut <- *jobError: + if err != nil { + atomic.AddInt32(failedBatchesCount, 1) + if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { + select { + case <-ctx.Done(): + return + case chErrorsOut <- *jobError: + } + } else { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: + } } - } else { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: - } - } - - currentFBCount := atomic.LoadInt32(failedBatchesCount) - if currentFBCount > int32(retryConfig.MaxFailedBatchesLoad) { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}: - return + + if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}: + return + } } + continue } + current := atomic.LoadInt64(rowsLoaded) + logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table) + atomic.AddInt64(rowsLoaded, int64(processedRows)) continue } - atomic.AddInt64(rowsLoaded, int64(processedRows)) + pendingDone++ + accRows = append(accRows, batch.Rows...) + parentBatchesId = append(parentBatchesId, batch.Id) + + if len(accRows) >= batchSize { + if !flush() { + return + } + } } } } diff --git a/internal/app/etl/transformers/consume.go b/internal/app/etl/transformers/consume.go index f8545a0..bd3a92d 100644 --- a/internal/app/etl/transformers/consume.go +++ b/internal/app/etl/transformers/consume.go @@ -8,12 +8,14 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" ) func (mssqlTr *MssqlTransformer) Consume( ctx context.Context, columns []models.ColumnType, retryConfig config.RetryConfig, + batchSize int, chBatchesIn <-chan models.Batch, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, @@ -23,6 +25,32 @@ func (mssqlTr *MssqlTransformer) Consume( storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable) transformationPlan = append(transformationPlan, storagePlan...) + var accRows []models.UnknownRowValues + var parentBatchesId []uuid.UUID + var firstPartitionId uuid.UUID + + flush := func() bool { + if len(accRows) == 0 { + return true + } + out := models.Batch{ + Id: uuid.New(), + PartitionId: firstPartitionId, + ParentBatchesId: parentBatchesId, + Rows: accRows, + } + select { + case chBatchesOut <- out: + wgActiveBatches.Add(1) + case <-ctx.Done(): + return false + } + accRows = nil + parentBatchesId = nil + firstPartitionId = uuid.Nil + return true + } + for { if ctx.Err() != nil { return @@ -34,49 +62,56 @@ func (mssqlTr *MssqlTransformer) Consume( case batch, ok := <-chBatchesIn: if !ok { + flush() return } - if len(transformationPlan) == 0 { + if len(transformationPlan) > 0 { + err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig) + if err != nil { + if errors.Is(err, ctx.Err()) { + return + } + + if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { + select { + case chJobErrorsOut <- *jobError: + case <-ctx.Done(): + return + } + } else { + select { + case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: + case <-ctx.Done(): + return + } + } + + return + } + } + + if batchSize <= 0 { select { case chBatchesOut <- batch: wgActiveBatches.Add(1) - continue case <-ctx.Done(): return } + continue } - err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig) - if err != nil { - if errors.Is(err, ctx.Err()) { + if len(parentBatchesId) == 0 { + firstPartitionId = batch.PartitionId + } + accRows = append(accRows, batch.Rows...) + parentBatchesId = append(parentBatchesId, batch.Id) + + if len(accRows) >= batchSize { + if !flush() { return } - - if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { - select { - case chJobErrorsOut <- *jobError: - case <-ctx.Done(): - return - } - } else { - select { - case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: - case <-ctx.Done(): - return - } - } - - return } - - select { - case chBatchesOut <- batch: - case <-ctx.Done(): - return - } - - wgActiveBatches.Add(1) } } } diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index 81724f6..a1970ae 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -21,6 +21,7 @@ type Transformer interface { ctx context.Context, columns []models.ColumnType, retryConfig config.RetryConfig, + batchSize int, chBatchesIn <-chan models.Batch, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, diff --git a/internal/app/models/main.go b/internal/app/models/main.go index 60eb73e..5becf6a 100644 --- a/internal/app/models/main.go +++ b/internal/app/models/main.go @@ -9,10 +9,11 @@ import ( type UnknownRowValues = []any type Batch struct { - Id uuid.UUID - PartitionId uuid.UUID - Rows []UnknownRowValues - RetryCounter int + Id uuid.UUID + PartitionId uuid.UUID + ParentBatchesId []uuid.UUID + Rows []UnknownRowValues + RetryCounter int } type PartitionRange struct {