diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index b4583c2..512e040 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -150,6 +150,7 @@ func processMigrationJob( job.SourceTable, sourceColTypes, job.ExtractorBatchSize, + job.Retry, chPartitions, chBatchesRaw, chJobErrors, diff --git a/internal/app/custom_errors/backoff.go b/internal/app/custom_errors/backoff.go index fc469da..64e928a 100644 --- a/internal/app/custom_errors/backoff.go +++ b/internal/app/custom_errors/backoff.go @@ -6,7 +6,7 @@ import ( "time" ) -func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration { +func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration { if retryCounter < 0 { retryCounter = 0 } diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go index b282be0..1ab5888 100644 --- a/internal/app/custom_errors/extractor.error.go +++ b/internal/app/custom_errors/extractor.error.go @@ -1,13 +1,7 @@ package custom_errors import ( - "context" - "fmt" - "sync" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" ) type ExtractorError struct { @@ -20,100 +14,3 @@ type ExtractorError struct { func (e *ExtractorError) Error() string { return e.Msg } - -func ExtractorErrorHandler( - ctx context.Context, - retryConfig config.RetryConfig, - maxPartitionErrors int, - chErrorsIn <-chan ExtractorError, - chPartitionsOut chan<- models.Partition, - chJobErrorsOut chan<- JobError, - wgActivePartitions *sync.WaitGroup, -) { - definitiveErrors := 0 - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - - case err, ok := <-chErrorsIn: - if !ok { - return - } - - if err.Partition.RetryCounter >= retryConfig.Attempts { - wgActivePartitions.Done() - definitiveErrors++ - jobError := JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts), - Prev: &err, - } - - select { - case chJobErrorsOut <- jobError: - case <-ctx.Done(): - return - } - - if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors { - fatalError := JobError{ - ShouldCancelJob: true, - Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors), - Prev: &err, - } - - select { - case chJobErrorsOut <- fatalError: - case <-ctx.Done(): - return - } - } - - continue - } else { - jobError := JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter), - Prev: &err, - } - - select { - case chJobErrorsOut <- jobError: - case <-ctx.Done(): - return - } - } - - newPartition := err.Partition - newPartition.RetryCounter++ - - delay := computeBackoffDelay( - newPartition.RetryCounter, - retryConfig.BaseDelayMs, - retryConfig.MaxDelayMs, - retryConfig.MaxJitterMs, - ) - - if err.HasLastId { - newPartition.ParentId = err.Partition.Id - newPartition.Id = uuid.New() - newPartition.Range.Min = err.LastId - newPartition.Range.IsMinInclusive = false - } - - requeueWithBackoff(ctx, delay, func() { - select { - case chPartitionsOut <- newPartition: - case <-ctx.Done(): - return - } - }) - } - } -} diff --git a/internal/app/custom_errors/loader.error.go b/internal/app/custom_errors/loader.error.go index 189b72e..927e446 100644 --- a/internal/app/custom_errors/loader.error.go +++ b/internal/app/custom_errors/loader.error.go @@ -88,7 +88,7 @@ func LoaderErrorHandler( } err.Batch.RetryCounter++ - delay := computeBackoffDelay( + delay := ComputeBackoffDelay( err.Batch.RetryCounter, retryConfig.BaseDelayMs, retryConfig.MaxDelayMs, diff --git a/internal/app/etl/extractors/consume.go b/internal/app/etl/extractors/consume.go index f9b2dfd..9884288 100644 --- a/internal/app/etl/extractors/consume.go +++ b/internal/app/etl/extractors/consume.go @@ -19,6 +19,7 @@ func (ex *GenericExtractor) Consume( tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, + retryConfig config.RetryConfig, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, chErrorsOut chan<- custom_errors.JobError, @@ -62,6 +63,7 @@ func (ex *GenericExtractor) Consume( batchSize, partition, indexPrimaryKey, + retryConfig, chBatchesOut, ) wgActivePartitions.Done() diff --git a/internal/app/etl/extractors/process-with-retries.go b/internal/app/etl/extractors/process-with-retries.go index fb65b8d..5837306 100644 --- a/internal/app/etl/extractors/process-with-retries.go +++ b/internal/app/etl/extractors/process-with-retries.go @@ -20,10 +20,10 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries( batchSize int, partition models.Partition, indexPrimaryKey int, + retryConfig config.RetryConfig, chBatchesOut chan<- models.Batch, ) (int64, error) { var totalRowsRead int64 - delay := time.Duration(time.Second * 1) currentParitition := partition for { @@ -46,7 +46,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries( if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { currentParitition.RetryCounter++ - if currentParitition.RetryCounter > 3 { + if currentParitition.RetryCounter >= retryConfig.Attempts { return totalRowsRead, &custom_errors.JobError{ Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), Prev: err, @@ -60,6 +60,12 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries( currentParitition.Range.IsMinInclusive = false } + delay := custom_errors.ComputeBackoffDelay( + currentParitition.RetryCounter, + retryConfig.BaseDelayMs, + retryConfig.MaxDelayMs, + retryConfig.MaxJitterMs, + ) time.Sleep(delay) continue }