diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 9a2fac3..3930922 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -95,8 +95,22 @@ func processMigrationJob( } }() - go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chPartitions, chJobErrors, &wgActivePartitions) - go custom_errors.LoaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chBatchesTransformed, chJobErrors, &wgActiveBatches) + go custom_errors.ExtractorErrorHandler( + jobCtx, + job.Retry, + chExtractorErrors, + chPartitions, + chJobErrors, + &wgActivePartitions, + ) + go custom_errors.LoaderErrorHandler( + jobCtx, + job.Retry, + chLoadersErrors, + chBatchesTransformed, + chJobErrors, + &wgActiveBatches, + ) maxExtractors := min(job.MaxExtractors, len(partitions)) log.Infof("Starting %d extractor(s)...", maxExtractors) diff --git a/config.yaml b/config.yaml index 4012875..ae6243a 100644 --- a/config.yaml +++ b/config.yaml @@ -12,6 +12,9 @@ defaults: truncate_method: TRUNCATE # TRUNCATE | DELETE retry: attempts: 3 + base_delay_ms: 500 + max_delay_ms: 10000 + max_jitter_ms: 500 jobs: - name: cartografia_manzana diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index 2574279..4819c4a 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -9,6 +9,9 @@ import ( type RetryConfig struct { Attempts int `yaml:"attempts"` + BaseDelayMs int `yaml:"base_delay_ms"` + MaxDelayMs int `yaml:"max_delay_ms"` + MaxJitterMs int `yaml:"max_jitter_ms"` } type JobConfig struct { diff --git a/internal/app/custom_errors/backoff.go b/internal/app/custom_errors/backoff.go new file mode 100644 index 0000000..fc469da --- /dev/null +++ b/internal/app/custom_errors/backoff.go @@ -0,0 +1,61 @@ +package custom_errors + +import ( + "context" + "math/rand" + "time" +) + +func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration { + if retryCounter < 0 { + retryCounter = 0 + } + + delay := max(time.Duration(baseDelayMs)*time.Millisecond, 0) + + maxDelay := time.Duration(maxDelayMs) * time.Millisecond + for i := 0; i < retryCounter; i++ { + if maxDelayMs > 0 && delay >= maxDelay { + delay = maxDelay + break + } + if delay == 0 { + break + } + delay *= 2 + } + + if maxDelayMs > 0 && delay > maxDelay { + delay = maxDelay + } + + if maxJitterMs > 0 { + jitter := time.Duration(rand.Intn(maxJitterMs+1)) * time.Millisecond + delay += jitter + } + + if delay < 0 { + delay = 0 + } + + return delay +} + +func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) { + if delay <= 0 { + enqueue() + return + } + + go func() { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + return + case <-timer.C: + enqueue() + } + }() +} diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go index 95373dc..5ab02e7 100644 --- a/internal/app/custom_errors/extractor.error.go +++ b/internal/app/custom_errors/extractor.error.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) @@ -22,7 +23,7 @@ func (e *ExtractorError) Error() string { func ExtractorErrorHandler( ctx context.Context, - maxRetryAttempts int, + retryConfig config.RetryConfig, chErrorsIn <-chan ExtractorError, chPartitionsOut chan<- models.Partition, chJobErrorsOut chan<- JobError, @@ -42,11 +43,11 @@ func ExtractorErrorHandler( return } - if err.Partition.RetryCounter >= maxRetryAttempts { + if err.Partition.RetryCounter >= retryConfig.Attempts { wgActivePartitions.Done() jobError := JobError{ ShouldCancelJob: false, - Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, maxRetryAttempts), + Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts), Prev: &err, } @@ -74,6 +75,13 @@ func ExtractorErrorHandler( newPartition := err.Partition newPartition.RetryCounter++ + delay := computeBackoffDelay( + newPartition.RetryCounter, + retryConfig.BaseDelayMs, + retryConfig.MaxDelayMs, + retryConfig.MaxJitterMs, + ) + if err.HasLastId { newPartition.ParentId = err.Partition.Id newPartition.Id = uuid.New() @@ -81,11 +89,13 @@ func ExtractorErrorHandler( newPartition.IsLowerLimitInclusive = false } - select { - case chPartitionsOut <- newPartition: - case <-ctx.Done(): - return - } + requeueWithBackoff(ctx, delay, func() { + select { + case chPartitionsOut <- newPartition: + case <-ctx.Done(): + return + } + }) } } } diff --git a/internal/app/custom_errors/loader.error.go b/internal/app/custom_errors/loader.error.go index aabe416..30dae24 100644 --- a/internal/app/custom_errors/loader.error.go +++ b/internal/app/custom_errors/loader.error.go @@ -5,12 +5,13 @@ import ( "fmt" "sync" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) type LoaderError struct { - models.Batch - Msg string + Batch models.Batch + Msg string } func (e *LoaderError) Error() string { @@ -19,7 +20,7 @@ func (e *LoaderError) Error() string { func LoaderErrorHandler( ctx context.Context, - maxRetryAttempts int, + retryConfig config.RetryConfig, chErrorsIn <-chan LoaderError, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- JobError, @@ -39,11 +40,11 @@ func LoaderErrorHandler( return } - if err.RetryCounter >= maxRetryAttempts { + if err.Batch.RetryCounter >= retryConfig.Attempts { wgActiveBatches.Done() jobError := JobError{ ShouldCancelJob: false, - Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Id, maxRetryAttempts), + Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts), Prev: &err, } @@ -68,13 +69,21 @@ func LoaderErrorHandler( } } - err.RetryCounter++ + err.Batch.RetryCounter++ + delay := computeBackoffDelay( + err.Batch.RetryCounter, + retryConfig.BaseDelayMs, + retryConfig.MaxDelayMs, + retryConfig.MaxJitterMs, + ) - select { - case chBatchesOut <- err.Batch: - case <-ctx.Done(): - return - } + requeueWithBackoff(ctx, delay, func() { + select { + case chBatchesOut <- err.Batch: + case <-ctx.Done(): + return + } + }) } } } diff --git a/internal/app/etl/table_analyzers/mssql.go b/internal/app/etl/table_analyzers/mssql.go index cf8cd57..abe6ac6 100644 --- a/internal/app/etl/table_analyzers/mssql.go +++ b/internal/app/etl/table_analyzers/mssql.go @@ -35,8 +35,7 @@ JOIN sys.types t ON c.user_type_id = t.user_type_id LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id JOIN sys.tables st ON c.object_id = st.object_id JOIN sys.schemas s ON st.schema_id = s.schema_id -WHERE s.name = @schema AND st.name = @table - AND c.name NOT LIKE 'graph_id%' +WHERE s.name = @schema AND st.name = @table AND c.name NOT LIKE 'graph_id%' ORDER BY c.column_id;` type rawColumnMssql struct {