diff --git a/cmd/go_migrate/extractor-error-handler.go b/cmd/go_migrate/extractor-error-handler.go index 5577cab..3462bf1 100644 --- a/cmd/go_migrate/extractor-error-handler.go +++ b/cmd/go_migrate/extractor-error-handler.go @@ -19,10 +19,15 @@ func (e *ExtractorError) Error() string { const maxRetryAttempts = 3 -func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chGlobalErrorsOut chan<- error) { +func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chJobErrorsOut chan<- JobError) { for err := range chErrorsIn { if err.RetryCounter >= maxRetryAttempts { - chGlobalErrorsOut <- fmt.Errorf("batch %v reached max retries (%d): %s", err.Id, maxRetryAttempts, err.Msg) + jobError := JobError{ + ShouldCancelJob: false, + Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts), + Prev: &err, + } + chJobErrorsOut <- jobError continue } diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go index f971db4..66053dd 100644 --- a/cmd/go_migrate/extractor.go +++ b/cmd/go_migrate/extractor.go @@ -23,20 +23,18 @@ func extractFromMssql( chBatchesIn <-chan Batch, chChunksOut chan<- []UnknownRowValues, chErrorsOut chan<- ExtractorError, + chJobErrorsOut chan<- JobError, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool { return strings.EqualFold(col.name, job.PrimaryKey) }) if indexPrimaryKey == -1 { - exError := ExtractorError{ - Batch: Batch{ - RetryCounter: maxRetryAttempts, - }, - HasLastId: false, - Msg: "Primary key not found in columns provided", + exError := JobError{ + ShouldCancelJob: true, + Msg: "Primary key not found in provided columns", } - chErrorsOut <- exError + chJobErrorsOut <- exError return } @@ -91,6 +89,7 @@ func extractFromMssql( } lastRow := rowsChunk[len(rowsChunk)-1] + chChunksOut <- rowsChunk chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) return } diff --git a/cmd/go_migrate/job-error-handler.go b/cmd/go_migrate/job-error-handler.go new file mode 100644 index 0000000..1887423 --- /dev/null +++ b/cmd/go_migrate/job-error-handler.go @@ -0,0 +1,33 @@ +package main + +import ( + "fmt" + + log "github.com/sirupsen/logrus" +) + +type JobError struct { + ShouldCancelJob bool + Msg string + Prev error +} + +func (e *JobError) Error() string { + if e.Prev != nil { + return fmt.Sprintf("%s: %v", e.Msg, e.Prev) + } + + return e.Msg +} + +func jobErrorHandler(chErrorsIn <-chan JobError) error { + for err := range chErrorsIn { + if err.ShouldCancelJob { + return &err + } + + log.Error(err) + } + + return nil +} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 758b89f..c2d2311 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -24,18 +24,31 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration logColumnTypes(sourceColTypes, "Source col types") logColumnTypes(targetColTypes, "Target col types") - mssqlCtx := context.Background() - batches, err := batchGeneratorMssql(mssqlCtx, sourceDb, job) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + batches, err := batchGeneratorMssql(ctx, sourceDb, job) if err != nil { log.Error("Unexpected error calculating batch ranges: ", err) } - chJobErrors := make(chan error) + chJobErrors := make(chan JobError) defer close(chJobErrors) + go func() { + if err := jobErrorHandler(chJobErrors); err != nil { + cancel() + } + }() + chBatches := make(chan Batch, len(batches)) - chChunks := make(chan []UnknownRowValues, QueueSize) chExtractorErrors := make(chan ExtractorError, len(batches)) + + go func() { + extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors) + }() + + chChunks := make(chan []UnknownRowValues, QueueSize) maxExtractors := min(NumExtractors, len(batches)) var wgMssqlExtractors sync.WaitGroup @@ -43,7 +56,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration extractStartTime := time.Now() for range maxExtractors { wgMssqlExtractors.Go(func() { - extractFromMssql(mssqlCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors) + extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors, chJobErrors) }) } @@ -55,10 +68,6 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration close(chExtractorErrors) }() - go func() { - extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors) - }() - go func() { wgMssqlExtractors.Wait() close(chChunks) @@ -83,14 +92,13 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration }() var wgPostgresLoaders sync.WaitGroup - postgresLoaderCtx := context.Background() log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) loaderStartTime := time.Now() for range NumLoaders { wgPostgresLoaders.Go(func() { - if err := loadRowsPostgres(postgresLoaderCtx, job, targetColTypes, targetDb, chRowsTransform); err != nil { + if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chRowsTransform); err != nil { log.Error("Unexpected error loading data into postgres: ", err) } })