From 85d7d69da9f8f40bc6956dd20fb6cf1dd9a0427e Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Fri, 8 May 2026 23:22:53 -0500 Subject: [PATCH] refactor: streamline error handling in migration process; consolidate failed partitions and batches tracking --- cmd/go_migrate/process.go | 12 +++++------- config.yaml | 1 - internal/app/config/migration.go | 13 ++++++------- internal/app/etl/extractors/consume.go | 12 ++++++++++++ internal/app/etl/loaders/consume.go | 18 ++++++++++++++++-- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index ec30597..1210f1c 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -57,8 +57,6 @@ func processMigrationJob( StartTime: time.Now(), } - var rowsRead, rowsLoaded, rowsFailed int64 - var wgQueryColumnTypes errgroup.Group var sourceColTypes, targetColTypes []models.ColumnType @@ -118,11 +116,9 @@ func processMigrationJob( chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize) chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize) - var wgActivePartitions sync.WaitGroup - var wgActiveBatches sync.WaitGroup - var wgExtractors sync.WaitGroup - var wgTransformers sync.WaitGroup - var wgLoaders sync.WaitGroup + var wgActivePartitions, wgActiveBatches, wgExtractors, wgTransformers, wgLoaders sync.WaitGroup + var rowsRead, rowsLoaded, rowsFailed int64 + var failedPartitionsCount, failedBatchesLoadCount int32 go func() { if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil { @@ -148,6 +144,7 @@ func processMigrationJob( chJobErrors, &wgActivePartitions, &rowsRead, + &failedPartitionsCount, job.SourceTable.FromJsonColumns, ) }) @@ -188,6 +185,7 @@ func processMigrationJob( chJobErrors, &wgActiveBatches, &rowsLoaded, + &failedBatchesLoadCount, ) }) } diff --git a/config.yaml b/config.yaml index d71c30e..bddf736 100644 --- a/config.yaml +++ b/config.yaml @@ -20,7 +20,6 @@ defaults: max_delay_ms: 10000 max_jitter_ms: 500 max_failed_partitions: 5 - max_failed_batches_transform: 5 max_failed_batches_load: 5 jobs: diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index e897683..39309f7 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -8,13 +8,12 @@ import ( ) type RetryConfig struct { - Attempts int `yaml:"attempts"` - BaseDelayMs int `yaml:"base_delay_ms"` - MaxDelayMs int `yaml:"max_delay_ms"` - MaxJitterMs int `yaml:"max_jitter_ms"` - MaxFailedPartitions int `yaml:"max_failed_partitions"` - MaxFailedBatchesTransform int `yaml:"max_failed_batches_transform"` - MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"` + Attempts int `yaml:"attempts"` + BaseDelayMs int `yaml:"base_delay_ms"` + MaxDelayMs int `yaml:"max_delay_ms"` + MaxJitterMs int `yaml:"max_jitter_ms"` + MaxFailedPartitions int `yaml:"max_failed_partitions"` + MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"` } type ToStorageColumnConfig struct { diff --git a/internal/app/etl/extractors/consume.go b/internal/app/etl/extractors/consume.go index da5cab0..3090aab 100644 --- a/internal/app/etl/extractors/consume.go +++ b/internal/app/etl/extractors/consume.go @@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume( chErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64, + failedPartitionsCount *int32, fromJsonColumns []config.FromJsonItem, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { @@ -77,6 +78,7 @@ func (ex *GenericExtractor) Consume( } if err != nil { + atomic.AddInt32(failedPartitionsCount, 1) if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { select { case <-ctx.Done(): @@ -90,6 +92,16 @@ func (ex *GenericExtractor) Consume( case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: } } + + currentFPCount := atomic.LoadInt32(failedPartitionsCount) + if currentFPCount > int32(retryConfig.MaxFailedPartitions) { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed partitions reached"}: + return + } + } } } } diff --git a/internal/app/etl/loaders/consume.go b/internal/app/etl/loaders/consume.go index d634125..9b8f19b 100644 --- a/internal/app/etl/loaders/consume.go +++ b/internal/app/etl/loaders/consume.go @@ -20,6 +20,7 @@ func (gl *GenericLoader) Consume( chErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, rowsLoaded *int64, + failedBatchesCount *int32, ) { colNames := mapSlice(columns, func(col models.ColumnType) string { return col.Name() @@ -42,6 +43,7 @@ func (gl *GenericLoader) Consume( wgActiveBatches.Done() if err != nil { + atomic.AddInt32(failedBatchesCount, 1) if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { select { case <-ctx.Done(): @@ -55,9 +57,21 @@ func (gl *GenericLoader) Consume( case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: } } - } else { - atomic.AddInt64(rowsLoaded, int64(processedRows)) + + currentFBCount := atomic.LoadInt32(failedBatchesCount) + if currentFBCount > int32(retryConfig.MaxFailedBatchesLoad) { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}: + return + } + } + + continue } + + atomic.AddInt64(rowsLoaded, int64(processedRows)) } } }