feat: add max partition and chunk error limits to extractor and loader error handlers

This commit is contained in:
2026-04-12 20:57:31 -05:00
parent f126d5bbd0
commit 85074da2ec
5 changed files with 44 additions and 2 deletions

View File

@@ -98,6 +98,7 @@ func processMigrationJob(
go custom_errors.ExtractorErrorHandler( go custom_errors.ExtractorErrorHandler(
jobCtx, jobCtx,
job.Retry, job.Retry,
job.MaxPartitionErrrors,
chExtractorErrors, chExtractorErrors,
chPartitions, chPartitions,
chJobErrors, chJobErrors,
@@ -106,6 +107,7 @@ func processMigrationJob(
go custom_errors.LoaderErrorHandler( go custom_errors.LoaderErrorHandler(
jobCtx, jobCtx,
job.Retry, job.Retry,
job.MaxChunkErrors,
chLoadersErrors, chLoadersErrors,
chBatchesTransformed, chBatchesTransformed,
chJobErrors, chJobErrors,

View File

@@ -10,6 +10,8 @@ defaults:
batches_per_partition: 8 batches_per_partition: 8
truncate_target: true truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_chunk_errors: 5
retry: retry:
attempts: 3 attempts: 3
base_delay_ms: 500 base_delay_ms: 500

View File

@@ -22,6 +22,8 @@ type JobConfig struct {
BatchesPerPartition int `yaml:"batches_per_partition"` BatchesPerPartition int `yaml:"batches_per_partition"`
TruncateTarget bool `yaml:"truncate_target"` TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"` TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxChunkErrors int `yaml:"max_chunk_errors"`
Retry RetryConfig `yaml:"retry"` Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64 RowsPerPartition int64
} }

View File

@@ -24,11 +24,14 @@ func (e *ExtractorError) Error() string {
func ExtractorErrorHandler( func ExtractorErrorHandler(
ctx context.Context, ctx context.Context,
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
maxPartitionErrors int,
chErrorsIn <-chan ExtractorError, chErrorsIn <-chan ExtractorError,
chPartitionsOut chan<- models.Partition, chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActivePartitions *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
) { ) {
definitiveErrors := 0
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
@@ -45,6 +48,7 @@ func ExtractorErrorHandler(
if err.Partition.RetryCounter >= retryConfig.Attempts { if err.Partition.RetryCounter >= retryConfig.Attempts {
wgActivePartitions.Done() wgActivePartitions.Done()
definitiveErrors++
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts), Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
@@ -57,6 +61,20 @@ func ExtractorErrorHandler(
return return
} }
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue continue
} else { } else {
jobError := JobError{ jobError := JobError{

View File

@@ -21,11 +21,14 @@ func (e *LoaderError) Error() string {
func LoaderErrorHandler( func LoaderErrorHandler(
ctx context.Context, ctx context.Context,
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
maxChunkErrors int,
chErrorsIn <-chan LoaderError, chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
) { ) {
definitiveErrors := 0
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
@@ -42,6 +45,7 @@ func LoaderErrorHandler(
if err.Batch.RetryCounter >= retryConfig.Attempts { if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done() wgActiveBatches.Done()
definitiveErrors++
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts), Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
@@ -54,6 +58,20 @@ func LoaderErrorHandler(
return return
} }
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue continue
} else { } else {
jobError := JobError{ jobError := JobError{