feat: implement exponential backoff strategy for error handling in extractor and loader processes; enhance retry configuration options

This commit is contained in:
2026-04-12 20:35:29 -05:00
parent 5633dc98d0
commit f126d5bbd0
7 changed files with 122 additions and 23 deletions

View File

@@ -95,8 +95,22 @@ func processMigrationJob(
} }
}() }()
go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chPartitions, chJobErrors, &wgActivePartitions) go custom_errors.ExtractorErrorHandler(
go custom_errors.LoaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chBatchesTransformed, chJobErrors, &wgActiveBatches) jobCtx,
job.Retry,
chExtractorErrors,
chPartitions,
chJobErrors,
&wgActivePartitions,
)
go custom_errors.LoaderErrorHandler(
jobCtx,
job.Retry,
chLoadersErrors,
chBatchesTransformed,
chJobErrors,
&wgActiveBatches,
)
maxExtractors := min(job.MaxExtractors, len(partitions)) maxExtractors := min(job.MaxExtractors, len(partitions))
log.Infof("Starting %d extractor(s)...", maxExtractors) log.Infof("Starting %d extractor(s)...", maxExtractors)

View File

@@ -12,6 +12,9 @@ defaults:
truncate_method: TRUNCATE # TRUNCATE | DELETE truncate_method: TRUNCATE # TRUNCATE | DELETE
retry: retry:
attempts: 3 attempts: 3
base_delay_ms: 500
max_delay_ms: 10000
max_jitter_ms: 500
jobs: jobs:
- name: cartografia_manzana - name: cartografia_manzana

View File

@@ -9,6 +9,9 @@ import (
type RetryConfig struct { type RetryConfig struct {
Attempts int `yaml:"attempts"` Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"`
} }
type JobConfig struct { type JobConfig struct {

View File

@@ -0,0 +1,61 @@
package custom_errors
import (
"context"
"math/rand"
"time"
)
func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
if retryCounter < 0 {
retryCounter = 0
}
delay := max(time.Duration(baseDelayMs)*time.Millisecond, 0)
maxDelay := time.Duration(maxDelayMs) * time.Millisecond
for i := 0; i < retryCounter; i++ {
if maxDelayMs > 0 && delay >= maxDelay {
delay = maxDelay
break
}
if delay == 0 {
break
}
delay *= 2
}
if maxDelayMs > 0 && delay > maxDelay {
delay = maxDelay
}
if maxJitterMs > 0 {
jitter := time.Duration(rand.Intn(maxJitterMs+1)) * time.Millisecond
delay += jitter
}
if delay < 0 {
delay = 0
}
return delay
}
func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) {
if delay <= 0 {
enqueue()
return
}
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return
case <-timer.C:
enqueue()
}
}()
}

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
@@ -22,7 +23,7 @@ func (e *ExtractorError) Error() string {
func ExtractorErrorHandler( func ExtractorErrorHandler(
ctx context.Context, ctx context.Context,
maxRetryAttempts int, retryConfig config.RetryConfig,
chErrorsIn <-chan ExtractorError, chErrorsIn <-chan ExtractorError,
chPartitionsOut chan<- models.Partition, chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
@@ -42,11 +43,11 @@ func ExtractorErrorHandler(
return return
} }
if err.Partition.RetryCounter >= maxRetryAttempts { if err.Partition.RetryCounter >= retryConfig.Attempts {
wgActivePartitions.Done() wgActivePartitions.Done()
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, maxRetryAttempts), Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
Prev: &err, Prev: &err,
} }
@@ -74,6 +75,13 @@ func ExtractorErrorHandler(
newPartition := err.Partition newPartition := err.Partition
newPartition.RetryCounter++ newPartition.RetryCounter++
delay := computeBackoffDelay(
newPartition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
if err.HasLastId { if err.HasLastId {
newPartition.ParentId = err.Partition.Id newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New() newPartition.Id = uuid.New()
@@ -81,11 +89,13 @@ func ExtractorErrorHandler(
newPartition.IsLowerLimitInclusive = false newPartition.IsLowerLimitInclusive = false
} }
requeueWithBackoff(ctx, delay, func() {
select { select {
case chPartitionsOut <- newPartition: case chPartitionsOut <- newPartition:
case <-ctx.Done(): case <-ctx.Done():
return return
} }
})
} }
} }
} }

View File

@@ -5,11 +5,12 @@ import (
"fmt" "fmt"
"sync" "sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
) )
type LoaderError struct { type LoaderError struct {
models.Batch Batch models.Batch
Msg string Msg string
} }
@@ -19,7 +20,7 @@ func (e *LoaderError) Error() string {
func LoaderErrorHandler( func LoaderErrorHandler(
ctx context.Context, ctx context.Context,
maxRetryAttempts int, retryConfig config.RetryConfig,
chErrorsIn <-chan LoaderError, chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
@@ -39,11 +40,11 @@ func LoaderErrorHandler(
return return
} }
if err.RetryCounter >= maxRetryAttempts { if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done() wgActiveBatches.Done()
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Id, maxRetryAttempts), Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
Prev: &err, Prev: &err,
} }
@@ -68,13 +69,21 @@ func LoaderErrorHandler(
} }
} }
err.RetryCounter++ err.Batch.RetryCounter++
delay := computeBackoffDelay(
err.Batch.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
requeueWithBackoff(ctx, delay, func() {
select { select {
case chBatchesOut <- err.Batch: case chBatchesOut <- err.Batch:
case <-ctx.Done(): case <-ctx.Done():
return return
} }
})
} }
} }
} }

View File

@@ -35,8 +35,7 @@ JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table WHERE s.name = @schema AND st.name = @table AND c.name NOT LIKE 'graph_id%'
AND c.name NOT LIKE 'graph_id%'
ORDER BY c.column_id;` ORDER BY c.column_id;`
type rawColumnMssql struct { type rawColumnMssql struct {