diff --git a/config.yaml b/config.yaml index f8887b1..d71c30e 100644 --- a/config.yaml +++ b/config.yaml @@ -12,9 +12,6 @@ defaults: transformer_queue_size: 8 max_loaders: 4 loader_batch_size: 25000 - max_failed_partitions: 5 - max_failed_batches_transform: 5 - max_failed_batches_load: 5 truncate_target: true truncate_method: TRUNCATE # TRUNCATE | DELETE retry: @@ -22,6 +19,9 @@ defaults: base_delay_ms: 500 max_delay_ms: 10000 max_jitter_ms: 500 + max_failed_partitions: 5 + max_failed_batches_transform: 5 + max_failed_batches_load: 5 jobs: - name: cartografia_manzana diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index dc19935..e897683 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -8,10 +8,13 @@ import ( ) type RetryConfig struct { - Attempts int `yaml:"attempts"` - BaseDelayMs int `yaml:"base_delay_ms"` - MaxDelayMs int `yaml:"max_delay_ms"` - MaxJitterMs int `yaml:"max_jitter_ms"` + Attempts int `yaml:"attempts"` + BaseDelayMs int `yaml:"base_delay_ms"` + MaxDelayMs int `yaml:"max_delay_ms"` + MaxJitterMs int `yaml:"max_jitter_ms"` + MaxFailedPartitions int `yaml:"max_failed_partitions"` + MaxFailedBatchesTransform int `yaml:"max_failed_batches_transform"` + MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"` } type ToStorageColumnConfig struct { @@ -25,23 +28,20 @@ type ToStorageConfig struct { } type JobConfig struct { - BatchesPerPartition int `yaml:"batches_per_partition"` - MaxExtractors int `yaml:"max_extractors"` - ExtractorBatchSize int `yaml:"extractor_batch_size"` - ExtractorQueueSize int `yaml:"extractor_queue_size"` - MaxTransformers int `yaml:"max_transformers"` - TransformerBatchSize int `yaml:"transformer_batch_size"` - TransformerQueueSize int `yaml:"transformer_queue_size"` - MaxLoaders int `yaml:"max_loaders"` - LoaderBatchSize int `yaml:"loader_batch_size"` - MaxFailedPartitions int `yaml:"max_failed_partitions"` - MaxFailedBatchesTransform int `yaml:"max_failed_batches_transform"` - MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"` - TruncateTarget bool `yaml:"truncate_target"` - TruncateMethod string `yaml:"truncate_method"` - Retry RetryConfig `yaml:"retry"` - RowsPerPartition int64 - ToStorage ToStorageConfig `yaml:"to_storage"` + BatchesPerPartition int `yaml:"batches_per_partition"` + MaxExtractors int `yaml:"max_extractors"` + ExtractorBatchSize int `yaml:"extractor_batch_size"` + ExtractorQueueSize int `yaml:"extractor_queue_size"` + MaxTransformers int `yaml:"max_transformers"` + TransformerBatchSize int `yaml:"transformer_batch_size"` + TransformerQueueSize int `yaml:"transformer_queue_size"` + MaxLoaders int `yaml:"max_loaders"` + LoaderBatchSize int `yaml:"loader_batch_size"` + TruncateTarget bool `yaml:"truncate_target"` + TruncateMethod string `yaml:"truncate_method"` + Retry RetryConfig `yaml:"retry"` + RowsPerPartition int64 + ToStorage ToStorageConfig `yaml:"to_storage"` } type FromJsonItem struct { diff --git a/internal/app/custom_errors/backoff.go b/internal/app/custom_errors/backoff.go index 64e928a..49052c6 100644 --- a/internal/app/custom_errors/backoff.go +++ b/internal/app/custom_errors/backoff.go @@ -1,7 +1,6 @@ package custom_errors import ( - "context" "math/rand" "time" ) @@ -40,22 +39,3 @@ func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJ return delay } - -func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) { - if delay <= 0 { - enqueue() - return - } - - go func() { - timer := time.NewTimer(delay) - defer timer.Stop() - - select { - case <-ctx.Done(): - return - case <-timer.C: - enqueue() - } - }() -} diff --git a/internal/app/custom_errors/loader.error.go b/internal/app/custom_errors/loader.error.go index 927e446..1c12c2e 100644 --- a/internal/app/custom_errors/loader.error.go +++ b/internal/app/custom_errors/loader.error.go @@ -1,11 +1,6 @@ package custom_errors import ( - "context" - "fmt" - "sync" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) @@ -17,91 +12,3 @@ type LoaderError struct { func (e *LoaderError) Error() string { return e.Msg } - -func LoaderErrorHandler( - ctx context.Context, - retryConfig config.RetryConfig, - maxChunkErrors int, - chErrorsIn <-chan LoaderError, - chBatchesOut chan<- models.Batch, - chJobErrorsOut chan<- JobError, - wgActiveBatches *sync.WaitGroup, -) { - definitiveErrors := 0 - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - - case err, ok := <-chErrorsIn: - if !ok { - return - } - - if err.Batch.RetryCounter >= retryConfig.Attempts { - wgActiveBatches.Done() - definitiveErrors++ - jobError := JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts), - Prev: &err, - } - - select { - case chJobErrorsOut <- jobError: - case <-ctx.Done(): - return - } - - if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors { - fatalError := JobError{ - ShouldCancelJob: true, - Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors), - Prev: &err, - } - - select { - case chJobErrorsOut <- fatalError: - case <-ctx.Done(): - return - } - } - - continue - } else { - jobError := JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter), - Prev: &err, - } - - select { - case chJobErrorsOut <- jobError: - case <-ctx.Done(): - return - } - } - - err.Batch.RetryCounter++ - delay := ComputeBackoffDelay( - err.Batch.RetryCounter, - retryConfig.BaseDelayMs, - retryConfig.MaxDelayMs, - retryConfig.MaxJitterMs, - ) - - requeueWithBackoff(ctx, delay, func() { - select { - case chBatchesOut <- err.Batch: - case <-ctx.Done(): - return - } - }) - } - } -}