From 0ee5d9032c72fcbc747185cef5be4af01abd8956 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Wed, 8 Apr 2026 23:07:41 -0500 Subject: [PATCH] feat: add context support to error handlers for improved cancellation and error management --- cmd/go_migrate/extractor-error-handler.go | 71 ++++++++++++++++------- cmd/go_migrate/job-error-handler.go | 29 ++++++--- cmd/go_migrate/process.go | 8 ++- 3 files changed, 76 insertions(+), 32 deletions(-) diff --git a/cmd/go_migrate/extractor-error-handler.go b/cmd/go_migrate/extractor-error-handler.go index 3462bf1..e48af32 100644 --- a/cmd/go_migrate/extractor-error-handler.go +++ b/cmd/go_migrate/extractor-error-handler.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "github.com/google/uuid" @@ -19,29 +20,57 @@ func (e *ExtractorError) Error() string { const maxRetryAttempts = 3 -func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chJobErrorsOut chan<- JobError) { - for err := range chErrorsIn { - if err.RetryCounter >= maxRetryAttempts { - jobError := JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts), - Prev: &err, +func extractorErrorHandler( + ctx context.Context, + chErrorsIn <-chan ExtractorError, + chBatchesOut chan<- Batch, + chJobErrorsOut chan<- JobError, +) { + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + + case err, ok := <-chErrorsIn: + if !ok { + return + } + + if err.RetryCounter >= maxRetryAttempts { + jobError := JobError{ + ShouldCancelJob: false, + Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts), + Prev: &err, + } + + select { + case chJobErrorsOut <- jobError: + case <-ctx.Done(): + return + } + continue + } + + newBatch := err.Batch + newBatch.RetryCounter++ + + if err.HasLastId { + newBatch.ParentId = err.Id + newBatch.Id = uuid.New() + newBatch.LowerLimit = err.LastId + newBatch.IsLowerLimitInclusive = false + } + + select { + case chBatchesOut <- newBatch: + case <-ctx.Done(): + return } - chJobErrorsOut <- jobError - continue } - - newBatch := err.Batch - newBatch.RetryCounter++ - - if err.HasLastId { - newBatch.ParentId = err.Id - newBatch.Id = uuid.New() - newBatch.LowerLimit = err.LastId - newBatch.IsLowerLimitInclusive = false - } - - chBatchesOut <- newBatch } } diff --git a/cmd/go_migrate/job-error-handler.go b/cmd/go_migrate/job-error-handler.go index 1887423..5930bc9 100644 --- a/cmd/go_migrate/job-error-handler.go +++ b/cmd/go_migrate/job-error-handler.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" log "github.com/sirupsen/logrus" @@ -16,18 +17,30 @@ func (e *JobError) Error() string { if e.Prev != nil { return fmt.Sprintf("%s: %v", e.Msg, e.Prev) } - + return e.Msg } -func jobErrorHandler(chErrorsIn <-chan JobError) error { - for err := range chErrorsIn { - if err.ShouldCancelJob { - return &err +func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error { + for { + if ctx.Err() != nil { + return nil } - log.Error(err) - } + select { + case <-ctx.Done(): + return nil - return nil + case err, ok := <-chErrorsIn: + if !ok { + return nil + } + + if err.ShouldCancelJob { + return &err + } + + log.Error(err) + } + } } diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 08efbf2..db25289 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -36,8 +36,10 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration defer close(chJobErrors) go func() { - if err := jobErrorHandler(chJobErrors); err != nil { - cancel() + if err := jobErrorHandler(ctx, chJobErrors); err != nil { + if ctx.Err() == nil { + cancel() + } } }() @@ -45,7 +47,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration chExtractorErrors := make(chan ExtractorError, len(batches)) go func() { - extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors) + extractorErrorHandler(ctx, chExtractorErrors, chBatches, chJobErrors) }() chChunks := make(chan Chunk, QueueSize)