feat: enhance error handling with JobError struct and update extractor logic

This commit is contained in:
2026-04-08 20:21:58 -05:00
parent bc6f9a6a70
commit e158986947
4 changed files with 65 additions and 20 deletions

View File

@@ -19,10 +19,15 @@ func (e *ExtractorError) Error() string {
const maxRetryAttempts = 3 const maxRetryAttempts = 3
func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chGlobalErrorsOut chan<- error) { func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chJobErrorsOut chan<- JobError) {
for err := range chErrorsIn { for err := range chErrorsIn {
if err.RetryCounter >= maxRetryAttempts { if err.RetryCounter >= maxRetryAttempts {
chGlobalErrorsOut <- fmt.Errorf("batch %v reached max retries (%d): %s", err.Id, maxRetryAttempts, err.Msg) jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err,
}
chJobErrorsOut <- jobError
continue continue
} }

View File

@@ -23,20 +23,18 @@ func extractFromMssql(
chBatchesIn <-chan Batch, chBatchesIn <-chan Batch,
chChunksOut chan<- []UnknownRowValues, chChunksOut chan<- []UnknownRowValues,
chErrorsOut chan<- ExtractorError, chErrorsOut chan<- ExtractorError,
chJobErrorsOut chan<- JobError,
) { ) {
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool { indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
return strings.EqualFold(col.name, job.PrimaryKey) return strings.EqualFold(col.name, job.PrimaryKey)
}) })
if indexPrimaryKey == -1 { if indexPrimaryKey == -1 {
exError := ExtractorError{ exError := JobError{
Batch: Batch{ ShouldCancelJob: true,
RetryCounter: maxRetryAttempts, Msg: "Primary key not found in provided columns",
},
HasLastId: false,
Msg: "Primary key not found in columns provided",
} }
chErrorsOut <- exError chJobErrorsOut <- exError
return return
} }
@@ -91,6 +89,7 @@ func extractFromMssql(
} }
lastRow := rowsChunk[len(rowsChunk)-1] lastRow := rowsChunk[len(rowsChunk)-1]
chChunksOut <- rowsChunk
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
return return
} }

View File

@@ -0,0 +1,33 @@
package main
import (
"fmt"
log "github.com/sirupsen/logrus"
)
type JobError struct {
ShouldCancelJob bool
Msg string
Prev error
}
func (e *JobError) Error() string {
if e.Prev != nil {
return fmt.Sprintf("%s: %v", e.Msg, e.Prev)
}
return e.Msg
}
func jobErrorHandler(chErrorsIn <-chan JobError) error {
for err := range chErrorsIn {
if err.ShouldCancelJob {
return &err
}
log.Error(err)
}
return nil
}

View File

@@ -24,18 +24,31 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
logColumnTypes(sourceColTypes, "Source col types") logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types") logColumnTypes(targetColTypes, "Target col types")
mssqlCtx := context.Background() ctx, cancel := context.WithCancel(context.Background())
batches, err := batchGeneratorMssql(mssqlCtx, sourceDb, job) defer cancel()
batches, err := batchGeneratorMssql(ctx, sourceDb, job)
if err != nil { if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
chJobErrors := make(chan error) chJobErrors := make(chan JobError)
defer close(chJobErrors) defer close(chJobErrors)
go func() {
if err := jobErrorHandler(chJobErrors); err != nil {
cancel()
}
}()
chBatches := make(chan Batch, len(batches)) chBatches := make(chan Batch, len(batches))
chChunks := make(chan []UnknownRowValues, QueueSize)
chExtractorErrors := make(chan ExtractorError, len(batches)) chExtractorErrors := make(chan ExtractorError, len(batches))
go func() {
extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors)
}()
chChunks := make(chan []UnknownRowValues, QueueSize)
maxExtractors := min(NumExtractors, len(batches)) maxExtractors := min(NumExtractors, len(batches))
var wgMssqlExtractors sync.WaitGroup var wgMssqlExtractors sync.WaitGroup
@@ -43,7 +56,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
extractStartTime := time.Now() extractStartTime := time.Now()
for range maxExtractors { for range maxExtractors {
wgMssqlExtractors.Go(func() { wgMssqlExtractors.Go(func() {
extractFromMssql(mssqlCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors) extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors, chJobErrors)
}) })
} }
@@ -55,10 +68,6 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
close(chExtractorErrors) close(chExtractorErrors)
}() }()
go func() {
extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors)
}()
go func() { go func() {
wgMssqlExtractors.Wait() wgMssqlExtractors.Wait()
close(chChunks) close(chChunks)
@@ -83,14 +92,13 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
}() }()
var wgPostgresLoaders sync.WaitGroup var wgPostgresLoaders sync.WaitGroup
postgresLoaderCtx := context.Background()
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
loaderStartTime := time.Now() loaderStartTime := time.Now()
for range NumLoaders { for range NumLoaders {
wgPostgresLoaders.Go(func() { wgPostgresLoaders.Go(func() {
if err := loadRowsPostgres(postgresLoaderCtx, job, targetColTypes, targetDb, chRowsTransform); err != nil { if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chRowsTransform); err != nil {
log.Error("Unexpected error loading data into postgres: ", err) log.Error("Unexpected error loading data into postgres: ", err)
} }
}) })