refactor: move max failed batches configuration to retry section; clean up unused error handling code

This commit is contained in:
2026-05-08 23:00:23 -05:00
parent 212d3663e2
commit d54108d5e5
4 changed files with 24 additions and 137 deletions

View File

@@ -12,9 +12,6 @@ defaults:
transformer_queue_size: 8
max_loaders: 4
loader_batch_size: 25000
max_failed_partitions: 5
max_failed_batches_transform: 5
max_failed_batches_load: 5
truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE
retry:
@@ -22,6 +19,9 @@ defaults:
base_delay_ms: 500
max_delay_ms: 10000
max_jitter_ms: 500
max_failed_partitions: 5
max_failed_batches_transform: 5
max_failed_batches_load: 5
jobs:
- name: cartografia_manzana

View File

@@ -8,10 +8,13 @@ import (
)
type RetryConfig struct {
Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"`
Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"`
MaxFailedPartitions int `yaml:"max_failed_partitions"`
MaxFailedBatchesTransform int `yaml:"max_failed_batches_transform"`
MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"`
}
type ToStorageColumnConfig struct {
@@ -25,23 +28,20 @@ type ToStorageConfig struct {
}
type JobConfig struct {
BatchesPerPartition int `yaml:"batches_per_partition"`
MaxExtractors int `yaml:"max_extractors"`
ExtractorBatchSize int `yaml:"extractor_batch_size"`
ExtractorQueueSize int `yaml:"extractor_queue_size"`
MaxTransformers int `yaml:"max_transformers"`
TransformerBatchSize int `yaml:"transformer_batch_size"`
TransformerQueueSize int `yaml:"transformer_queue_size"`
MaxLoaders int `yaml:"max_loaders"`
LoaderBatchSize int `yaml:"loader_batch_size"`
MaxFailedPartitions int `yaml:"max_failed_partitions"`
MaxFailedBatchesTransform int `yaml:"max_failed_batches_transform"`
MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
BatchesPerPartition int `yaml:"batches_per_partition"`
MaxExtractors int `yaml:"max_extractors"`
ExtractorBatchSize int `yaml:"extractor_batch_size"`
ExtractorQueueSize int `yaml:"extractor_queue_size"`
MaxTransformers int `yaml:"max_transformers"`
TransformerBatchSize int `yaml:"transformer_batch_size"`
TransformerQueueSize int `yaml:"transformer_queue_size"`
MaxLoaders int `yaml:"max_loaders"`
LoaderBatchSize int `yaml:"loader_batch_size"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
}
type FromJsonItem struct {

View File

@@ -1,7 +1,6 @@
package custom_errors
import (
"context"
"math/rand"
"time"
)
@@ -40,22 +39,3 @@ func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJ
return delay
}
func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) {
if delay <= 0 {
enqueue()
return
}
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return
case <-timer.C:
enqueue()
}
}()
}

View File

@@ -1,11 +1,6 @@
package custom_errors
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
@@ -17,91 +12,3 @@ type LoaderError struct {
func (e *LoaderError) Error() string {
return e.Msg
}
func LoaderErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxChunkErrors int,
chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
err.Batch.RetryCounter++
delay := ComputeBackoffDelay(
err.Batch.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
requeueWithBackoff(ctx, delay, func() {
select {
case chBatchesOut <- err.Batch:
case <-ctx.Done():
return
}
})
}
}
}