From b64a76ca45a50a829ee30817d8e9b5d3807de673 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Thu, 9 Apr 2026 00:38:16 -0500 Subject: [PATCH] feat: improve error handling and job cancellation in migration process --- cmd/go_migrate/job-error-handler.go | 1 + cmd/go_migrate/loader.go | 22 +++++++++++++++++++++- cmd/go_migrate/main.go | 2 +- cmd/go_migrate/process.go | 11 ++++++++--- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/cmd/go_migrate/job-error-handler.go b/cmd/go_migrate/job-error-handler.go index aa7652b..4fc89ff 100644 --- a/cmd/go_migrate/job-error-handler.go +++ b/cmd/go_migrate/job-error-handler.go @@ -37,6 +37,7 @@ func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error { } if err.ShouldCancelJob { + log.Error(err.Msg, " - ", err.Prev) return &err } diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index 344b0d3..ac096cc 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -3,11 +3,13 @@ package main import ( "context" "database/sql" + "errors" "fmt" "sync" "time" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" mssql "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" @@ -20,6 +22,7 @@ func loadRowsPostgres( columns []ColumnType, chChunksIn <-chan Chunk, chErrorsOut chan<- LoaderError, + chJobErrorsOut chan<- JobError, wgActiveChunks *sync.WaitGroup, ) { tableId := pgx.Identifier{job.Schema, job.Table} @@ -40,7 +43,7 @@ func loadRowsPostgres( return } - if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, wgActiveChunks); abort { + if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks); abort { return } } @@ -54,6 +57,7 @@ func loadChunkPostgres( colNames []string, chunk Chunk, chErrorsOut chan<- LoaderError, + chJobErrorsOut chan<- JobError, wgActiveChunks *sync.WaitGroup, ) (abort bool) { chunkStartTime := time.Now() @@ -65,6 +69,22 @@ func loadChunkPostgres( ) if err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + if pgErr.Code == "23505" { + select { + case chJobErrorsOut <- JobError{ + ShouldCancelJob: true, + Msg: fmt.Sprintf("Fatal data integrity error in table %s", identifier.Sanitize()), + Prev: err, + }: + case <-ctx.Done(): + } + wgActiveChunks.Done() + return true + } + } + select { case chErrorsOut <- LoaderError{Chunk: chunk, Msg: err.Error()}: case <-ctx.Done(): diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 49a7ec5..a33b1dd 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -28,7 +28,7 @@ var migrationJobs []MigrationJob = []MigrationJob{ const ( NumExtractors int = 4 NumLoaders int = 8 - ChunkSize int = 50000 + ChunkSize int = 25000 QueueSize int = 8 ChunksPerBatch int = 16 RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 949a8f5..636a86e 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -32,7 +32,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration log.Error("Unexpected error calculating batch ranges: ", err) } - chJobErrors := make(chan JobError, 100) + chJobErrors := make(chan JobError, 50) chBatches := make(chan Batch, QueueSize) chExtractorErrors := make(chan ExtractorError, QueueSize) chChunksRaw := make(chan Chunk, QueueSize) @@ -45,7 +45,12 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration var wgTransformers sync.WaitGroup var wgLoaders sync.WaitGroup - go jobErrorHandler(ctx, chJobErrors) + go func() { + if err := jobErrorHandler(ctx, chJobErrors); err != nil { + cancel() + } + }() + go extractorErrorHandler(ctx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) go loaderErrorHandler(ctx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) @@ -80,7 +85,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration for range NumLoaders { wgLoaders.Go(func() { - loadRowsPostgres(ctx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, &wgActiveChunks) + loadRowsPostgres(ctx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks) }) }