refactor: enhance retry handling in extractor processes; unify backoff delay computation

This commit is contained in:
2026-05-05 23:16:13 -05:00
parent 7cb959a103
commit 2a5f703f3c
6 changed files with 13 additions and 107 deletions

View File

@@ -150,6 +150,7 @@ func processMigrationJob(
job.SourceTable, job.SourceTable,
sourceColTypes, sourceColTypes,
job.ExtractorBatchSize, job.ExtractorBatchSize,
job.Retry,
chPartitions, chPartitions,
chBatchesRaw, chBatchesRaw,
chJobErrors, chJobErrors,

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
) )
func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration { func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
if retryCounter < 0 { if retryCounter < 0 {
retryCounter = 0 retryCounter = 0
} }

View File

@@ -1,13 +1,7 @@
package custom_errors package custom_errors
import ( import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
) )
type ExtractorError struct { type ExtractorError struct {
@@ -20,100 +14,3 @@ type ExtractorError struct {
func (e *ExtractorError) Error() string { func (e *ExtractorError) Error() string {
return e.Msg return e.Msg
} }
func ExtractorErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxPartitionErrors int,
chErrorsIn <-chan ExtractorError,
chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError,
wgActivePartitions *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Partition.RetryCounter >= retryConfig.Attempts {
wgActivePartitions.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
newPartition := err.Partition
newPartition.RetryCounter++
delay := computeBackoffDelay(
newPartition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
if err.HasLastId {
newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New()
newPartition.Range.Min = err.LastId
newPartition.Range.IsMinInclusive = false
}
requeueWithBackoff(ctx, delay, func() {
select {
case chPartitionsOut <- newPartition:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -88,7 +88,7 @@ func LoaderErrorHandler(
} }
err.Batch.RetryCounter++ err.Batch.RetryCounter++
delay := computeBackoffDelay( delay := ComputeBackoffDelay(
err.Batch.RetryCounter, err.Batch.RetryCounter,
retryConfig.BaseDelayMs, retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs, retryConfig.MaxDelayMs,

View File

@@ -19,6 +19,7 @@ func (ex *GenericExtractor) Consume(
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
batchSize int, batchSize int,
retryConfig config.RetryConfig,
chPartitionsIn <-chan models.Partition, chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.JobError, chErrorsOut chan<- custom_errors.JobError,
@@ -62,6 +63,7 @@ func (ex *GenericExtractor) Consume(
batchSize, batchSize,
partition, partition,
indexPrimaryKey, indexPrimaryKey,
retryConfig,
chBatchesOut, chBatchesOut,
) )
wgActivePartitions.Done() wgActivePartitions.Done()

View File

@@ -20,10 +20,10 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
batchSize int, batchSize int,
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
retryConfig config.RetryConfig,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int64, error) { ) (int64, error) {
var totalRowsRead int64 var totalRowsRead int64
delay := time.Duration(time.Second * 1)
currentParitition := partition currentParitition := partition
for { for {
@@ -46,7 +46,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
currentParitition.RetryCounter++ currentParitition.RetryCounter++
if currentParitition.RetryCounter > 3 { if currentParitition.RetryCounter >= retryConfig.Attempts {
return totalRowsRead, &custom_errors.JobError{ return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err, Prev: err,
@@ -60,6 +60,12 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
currentParitition.Range.IsMinInclusive = false currentParitition.Range.IsMinInclusive = false
} }
delay := custom_errors.ComputeBackoffDelay(
currentParitition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
time.Sleep(delay) time.Sleep(delay)
continue continue
} }