feat: improve error handling and job cancellation in migration process

This commit is contained in:
2026-04-09 00:38:16 -05:00
parent 51480015ba
commit b64a76ca45
4 changed files with 31 additions and 5 deletions

View File

@@ -37,6 +37,7 @@ func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
} }
if err.ShouldCancelJob { if err.ShouldCancelJob {
log.Error(err.Msg, " - ", err.Prev)
return &err return &err
} }

View File

@@ -3,11 +3,13 @@ package main
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"fmt" "fmt"
"sync" "sync"
"time" "time"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
mssql "github.com/microsoft/go-mssqldb" mssql "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -20,6 +22,7 @@ func loadRowsPostgres(
columns []ColumnType, columns []ColumnType,
chChunksIn <-chan Chunk, chChunksIn <-chan Chunk,
chErrorsOut chan<- LoaderError, chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup, wgActiveChunks *sync.WaitGroup,
) { ) {
tableId := pgx.Identifier{job.Schema, job.Table} tableId := pgx.Identifier{job.Schema, job.Table}
@@ -40,7 +43,7 @@ func loadRowsPostgres(
return return
} }
if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, wgActiveChunks); abort { if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks); abort {
return return
} }
} }
@@ -54,6 +57,7 @@ func loadChunkPostgres(
colNames []string, colNames []string,
chunk Chunk, chunk Chunk,
chErrorsOut chan<- LoaderError, chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup, wgActiveChunks *sync.WaitGroup,
) (abort bool) { ) (abort bool) {
chunkStartTime := time.Now() chunkStartTime := time.Now()
@@ -65,6 +69,22 @@ func loadChunkPostgres(
) )
if err != nil { if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == "23505" {
select {
case chJobErrorsOut <- JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal data integrity error in table %s", identifier.Sanitize()),
Prev: err,
}:
case <-ctx.Done():
}
wgActiveChunks.Done()
return true
}
}
select { select {
case chErrorsOut <- LoaderError{Chunk: chunk, Msg: err.Error()}: case chErrorsOut <- LoaderError{Chunk: chunk, Msg: err.Error()}:
case <-ctx.Done(): case <-ctx.Done():

View File

@@ -28,7 +28,7 @@ var migrationJobs []MigrationJob = []MigrationJob{
const ( const (
NumExtractors int = 4 NumExtractors int = 4
NumLoaders int = 8 NumLoaders int = 8
ChunkSize int = 50000 ChunkSize int = 25000
QueueSize int = 8 QueueSize int = 8
ChunksPerBatch int = 16 ChunksPerBatch int = 16
RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch) RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch)

View File

@@ -32,7 +32,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
chJobErrors := make(chan JobError, 100) chJobErrors := make(chan JobError, 50)
chBatches := make(chan Batch, QueueSize) chBatches := make(chan Batch, QueueSize)
chExtractorErrors := make(chan ExtractorError, QueueSize) chExtractorErrors := make(chan ExtractorError, QueueSize)
chChunksRaw := make(chan Chunk, QueueSize) chChunksRaw := make(chan Chunk, QueueSize)
@@ -45,7 +45,12 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
var wgTransformers sync.WaitGroup var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup var wgLoaders sync.WaitGroup
go jobErrorHandler(ctx, chJobErrors) go func() {
if err := jobErrorHandler(ctx, chJobErrors); err != nil {
cancel()
}
}()
go extractorErrorHandler(ctx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) go extractorErrorHandler(ctx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
go loaderErrorHandler(ctx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) go loaderErrorHandler(ctx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
@@ -80,7 +85,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
for range NumLoaders { for range NumLoaders {
wgLoaders.Go(func() { wgLoaders.Go(func() {
loadRowsPostgres(ctx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, &wgActiveChunks) loadRowsPostgres(ctx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
}) })
} }