package custom_errors import ( "context" "fmt" "sync" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) type ExtractorError struct { Partition models.Partition LastId int64 HasLastId bool Msg string } func (e *ExtractorError) Error() string { return e.Msg } func ExtractorErrorHandler( ctx context.Context, retryConfig config.RetryConfig, maxPartitionErrors int, chErrorsIn <-chan ExtractorError, chPartitionsOut chan<- models.Partition, chJobErrorsOut chan<- JobError, wgActivePartitions *sync.WaitGroup, ) { definitiveErrors := 0 for { if ctx.Err() != nil { return } select { case <-ctx.Done(): return case err, ok := <-chErrorsIn: if !ok { return } if err.Partition.RetryCounter >= retryConfig.Attempts { wgActivePartitions.Done() definitiveErrors++ jobError := JobError{ ShouldCancelJob: false, Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts), Prev: &err, } select { case chJobErrorsOut <- jobError: case <-ctx.Done(): return } if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors { fatalError := JobError{ ShouldCancelJob: true, Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors), Prev: &err, } select { case chJobErrorsOut <- fatalError: case <-ctx.Done(): return } } continue } else { jobError := JobError{ ShouldCancelJob: false, Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter), Prev: &err, } select { case chJobErrorsOut <- jobError: case <-ctx.Done(): return } } newPartition := err.Partition newPartition.RetryCounter++ delay := computeBackoffDelay( newPartition.RetryCounter, retryConfig.BaseDelayMs, retryConfig.MaxDelayMs, retryConfig.MaxJitterMs, ) if err.HasLastId { newPartition.ParentId = err.Partition.Id newPartition.Id = uuid.New() newPartition.LowerLimit = err.LastId newPartition.IsLowerLimitInclusive = false } requeueWithBackoff(ctx, delay, func() { select { case chPartitionsOut <- newPartition: case <-ctx.Done(): return } }) } } }