feat: add context support to migration job processing for improved cancellation and error handling

This commit is contained in:
2026-04-09 00:43:11 -05:00
parent b64a76ca45
commit a0b51f40c1
2 changed files with 21 additions and 11 deletions

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -37,6 +38,10 @@ const (
func main() { func main() {
configureLog() configureLog()
startTime := time.Now() startTime := time.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Info("=== Starting migration ===") log.Info("=== Starting migration ===")
log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize) log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize)
@@ -50,7 +55,7 @@ func main() {
for _, job := range migrationJobs { for _, job := range migrationJobs {
log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table) log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table)
processMigrationJob(sourceDb, targetDb, job) processMigrationJob(ctx, sourceDb, targetDb, job)
} }
totalDuration := time.Since(startTime) totalDuration := time.Since(startTime)

View File

@@ -12,7 +12,12 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job MigrationJob) { func processMigrationJob(
ctx context.Context,
sourceDb *sql.DB,
targetDb *pgxpool.Pool,
job MigrationJob,
) {
jobStartTime := time.Now() jobStartTime := time.Now()
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey) log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
@@ -24,10 +29,10 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
logColumnTypes(sourceColTypes, "Source col types") logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types") logColumnTypes(targetColTypes, "Target col types")
ctx, cancel := context.WithCancel(context.Background()) jobCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
batches, err := batchGeneratorMssql(ctx, sourceDb, job) batches, err := batchGeneratorMssql(jobCtx, sourceDb, job)
if err != nil { if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
@@ -46,13 +51,13 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
var wgLoaders sync.WaitGroup var wgLoaders sync.WaitGroup
go func() { go func() {
if err := jobErrorHandler(ctx, chJobErrors); err != nil { if err := jobErrorHandler(jobCtx, chJobErrors); err != nil {
cancel() cancel()
} }
}() }()
go extractorErrorHandler(ctx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
go loaderErrorHandler(ctx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
maxExtractors := min(NumExtractors, len(batches)) maxExtractors := min(NumExtractors, len(batches))
log.Infof("Starting %d extractors...", maxExtractors) log.Infof("Starting %d extractors...", maxExtractors)
@@ -60,7 +65,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) extractFromMssql(jobCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches)
}) })
} }
@@ -76,7 +81,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
for range maxExtractors { for range maxExtractors {
wgTransformers.Go(func() { wgTransformers.Go(func() {
transformRowsMssql(ctx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks) transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks)
}) })
} }
@@ -85,7 +90,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
for range NumLoaders { for range NumLoaders {
wgLoaders.Go(func() { wgLoaders.Go(func() {
loadRowsPostgres(ctx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks) loadRowsPostgres(jobCtx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
}) })
} }
@@ -111,7 +116,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
cancel() cancel()
}() }()
<-ctx.Done() <-jobCtx.Done()
log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime)) log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime))
} }