diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 3930922..d159dc5 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -98,6 +98,7 @@ func processMigrationJob( go custom_errors.ExtractorErrorHandler( jobCtx, job.Retry, + job.MaxPartitionErrrors, chExtractorErrors, chPartitions, chJobErrors, @@ -106,6 +107,7 @@ func processMigrationJob( go custom_errors.LoaderErrorHandler( jobCtx, job.Retry, + job.MaxChunkErrors, chLoadersErrors, chBatchesTransformed, chJobErrors, diff --git a/config.yaml b/config.yaml index ae6243a..cad138e 100644 --- a/config.yaml +++ b/config.yaml @@ -10,6 +10,8 @@ defaults: batches_per_partition: 8 truncate_target: true truncate_method: TRUNCATE # TRUNCATE | DELETE + max_partition_errrors: 5 + max_chunk_errors: 5 retry: attempts: 3 base_delay_ms: 500 diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index 4819c4a..f3ceee4 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -8,9 +8,9 @@ import ( ) type RetryConfig struct { - Attempts int `yaml:"attempts"` + Attempts int `yaml:"attempts"` BaseDelayMs int `yaml:"base_delay_ms"` - MaxDelayMs int `yaml:"max_delay_ms"` + MaxDelayMs int `yaml:"max_delay_ms"` MaxJitterMs int `yaml:"max_jitter_ms"` } @@ -22,6 +22,8 @@ type JobConfig struct { BatchesPerPartition int `yaml:"batches_per_partition"` TruncateTarget bool `yaml:"truncate_target"` TruncateMethod string `yaml:"truncate_method"` + MaxPartitionErrrors int `yaml:"max_partition_errrors"` + MaxChunkErrors int `yaml:"max_chunk_errors"` Retry RetryConfig `yaml:"retry"` RowsPerPartition int64 } diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go index 5ab02e7..cea8da4 100644 --- a/internal/app/custom_errors/extractor.error.go +++ b/internal/app/custom_errors/extractor.error.go @@ -24,11 +24,14 @@ func (e *ExtractorError) Error() string { func ExtractorErrorHandler( ctx context.Context, retryConfig config.RetryConfig, + maxPartitionErrors int, chErrorsIn <-chan ExtractorError, chPartitionsOut chan<- models.Partition, chJobErrorsOut chan<- JobError, wgActivePartitions *sync.WaitGroup, ) { + definitiveErrors := 0 + for { if ctx.Err() != nil { return @@ -45,6 +48,7 @@ func ExtractorErrorHandler( if err.Partition.RetryCounter >= retryConfig.Attempts { wgActivePartitions.Done() + definitiveErrors++ jobError := JobError{ ShouldCancelJob: false, Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts), @@ -57,6 +61,20 @@ func ExtractorErrorHandler( return } + if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors { + fatalError := JobError{ + ShouldCancelJob: true, + Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors), + Prev: &err, + } + + select { + case chJobErrorsOut <- fatalError: + case <-ctx.Done(): + return + } + } + continue } else { jobError := JobError{ diff --git a/internal/app/custom_errors/loader.error.go b/internal/app/custom_errors/loader.error.go index 30dae24..189b72e 100644 --- a/internal/app/custom_errors/loader.error.go +++ b/internal/app/custom_errors/loader.error.go @@ -21,11 +21,14 @@ func (e *LoaderError) Error() string { func LoaderErrorHandler( ctx context.Context, retryConfig config.RetryConfig, + maxChunkErrors int, chErrorsIn <-chan LoaderError, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- JobError, wgActiveBatches *sync.WaitGroup, ) { + definitiveErrors := 0 + for { if ctx.Err() != nil { return @@ -42,6 +45,7 @@ func LoaderErrorHandler( if err.Batch.RetryCounter >= retryConfig.Attempts { wgActiveBatches.Done() + definitiveErrors++ jobError := JobError{ ShouldCancelJob: false, Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts), @@ -54,6 +58,20 @@ func LoaderErrorHandler( return } + if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors { + fatalError := JobError{ + ShouldCancelJob: true, + Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors), + Prev: &err, + } + + select { + case chJobErrorsOut <- fatalError: + case <-ctx.Done(): + return + } + } + continue } else { jobError := JobError{