refactor: streamline error handling in migration process; consolidate failed partitions and batches tracking

This commit is contained in:
2026-05-08 23:22:53 -05:00
parent d54108d5e5
commit 85d7d69da9
5 changed files with 39 additions and 17 deletions

View File

@@ -57,8 +57,6 @@ func processMigrationJob(
StartTime: time.Now(), StartTime: time.Now(),
} }
var rowsRead, rowsLoaded, rowsFailed int64
var wgQueryColumnTypes errgroup.Group var wgQueryColumnTypes errgroup.Group
var sourceColTypes, targetColTypes []models.ColumnType var sourceColTypes, targetColTypes []models.ColumnType
@@ -118,11 +116,9 @@ func processMigrationJob(
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize) chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize) chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
var wgActivePartitions sync.WaitGroup var wgActivePartitions, wgActiveBatches, wgExtractors, wgTransformers, wgLoaders sync.WaitGroup
var wgActiveBatches sync.WaitGroup var rowsRead, rowsLoaded, rowsFailed int64
var wgExtractors sync.WaitGroup var failedPartitionsCount, failedBatchesLoadCount int32
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
go func() { go func() {
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil { if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
@@ -148,6 +144,7 @@ func processMigrationJob(
chJobErrors, chJobErrors,
&wgActivePartitions, &wgActivePartitions,
&rowsRead, &rowsRead,
&failedPartitionsCount,
job.SourceTable.FromJsonColumns, job.SourceTable.FromJsonColumns,
) )
}) })
@@ -188,6 +185,7 @@ func processMigrationJob(
chJobErrors, chJobErrors,
&wgActiveBatches, &wgActiveBatches,
&rowsLoaded, &rowsLoaded,
&failedBatchesLoadCount,
) )
}) })
} }

View File

@@ -20,7 +20,6 @@ defaults:
max_delay_ms: 10000 max_delay_ms: 10000
max_jitter_ms: 500 max_jitter_ms: 500
max_failed_partitions: 5 max_failed_partitions: 5
max_failed_batches_transform: 5
max_failed_batches_load: 5 max_failed_batches_load: 5
jobs: jobs:

View File

@@ -13,7 +13,6 @@ type RetryConfig struct {
MaxDelayMs int `yaml:"max_delay_ms"` MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"` MaxJitterMs int `yaml:"max_jitter_ms"`
MaxFailedPartitions int `yaml:"max_failed_partitions"` MaxFailedPartitions int `yaml:"max_failed_partitions"`
MaxFailedBatchesTransform int `yaml:"max_failed_batches_transform"`
MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"` MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"`
} }

View File

@@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume(
chErrorsOut chan<- custom_errors.JobError, chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
rowsRead *int64, rowsRead *int64,
failedPartitionsCount *int32,
fromJsonColumns []config.FromJsonItem, fromJsonColumns []config.FromJsonItem,
) { ) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
@@ -77,6 +78,7 @@ func (ex *GenericExtractor) Consume(
} }
if err != nil { if err != nil {
atomic.AddInt32(failedPartitionsCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -90,6 +92,16 @@ func (ex *GenericExtractor) Consume(
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
} }
} }
currentFPCount := atomic.LoadInt32(failedPartitionsCount)
if currentFPCount > int32(retryConfig.MaxFailedPartitions) {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed partitions reached"}:
return
}
}
} }
} }
} }

View File

@@ -20,6 +20,7 @@ func (gl *GenericLoader) Consume(
chErrorsOut chan<- custom_errors.JobError, chErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64, rowsLoaded *int64,
failedBatchesCount *int32,
) { ) {
colNames := mapSlice(columns, func(col models.ColumnType) string { colNames := mapSlice(columns, func(col models.ColumnType) string {
return col.Name() return col.Name()
@@ -42,6 +43,7 @@ func (gl *GenericLoader) Consume(
wgActiveBatches.Done() wgActiveBatches.Done()
if err != nil { if err != nil {
atomic.AddInt32(failedBatchesCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -55,9 +57,21 @@ func (gl *GenericLoader) Consume(
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
} }
} }
} else {
currentFBCount := atomic.LoadInt32(failedBatchesCount)
if currentFBCount > int32(retryConfig.MaxFailedBatchesLoad) {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
return
}
}
continue
}
atomic.AddInt64(rowsLoaded, int64(processedRows)) atomic.AddInt64(rowsLoaded, int64(processedRows))
} }
} }
} }
}