From 73b65e2a3fb765273bc36dd4f4d96442d5f40340 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Fri, 17 Apr 2026 00:07:51 -0500 Subject: [PATCH] refactor: remove extractor error channel and simplify retry logic in mssql and postgres extractors --- cmd/go_migrate/process.go | 13 ------ internal/app/etl/extractors/mssql.go | 44 +++++++++--------- internal/app/etl/extractors/postgres.go | 61 +++++++++++++++++++++++-- 3 files changed, 80 insertions(+), 38 deletions(-) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 16ae071..6058ff7 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -85,7 +85,6 @@ func processMigrationJob( } chJobErrors := make(chan custom_errors.JobError, job.QueueSize) - chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) chPartitions := make(chan models.Partition, job.QueueSize) chBatchesRaw := make(chan models.Batch, job.QueueSize) @@ -105,15 +104,6 @@ func processMigrationJob( } }() - go custom_errors.ExtractorErrorHandler( - localCtx, - job.Retry, - job.MaxPartitionErrrors, - chExtractorErrors, - chPartitions, - chJobErrors, - &wgActivePartitions, - ) go custom_errors.LoaderErrorHandler( localCtx, job.Retry, @@ -136,7 +126,6 @@ func processMigrationJob( job.BatchSize, chPartitions, chBatchesRaw, - chExtractorErrors, chJobErrors, &wgActivePartitions, &rowsRead, @@ -190,8 +179,6 @@ func processMigrationJob( log.Debugf("wgActivePartitions is empty (%v)", job.Name) close(chPartitions) log.Debugf("chPartitions is closed (%v)", job.Name) - close(chExtractorErrors) - log.Debugf("chExtractorErrors is closed (%v)", job.Name) wgExtractors.Wait() log.Debugf("wgExtractors is empty (%v)", job.Name) diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index d871f1a..1157459 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -200,12 +200,10 @@ func (mssqlEx *MssqlExtractor) ExtractWithRetries( chBatchesOut chan<- models.Batch, ) (int64, error) { var totalRowsRead int64 - var fatalErr error delay := time.Duration(time.Second * 1) currentParitition := partition - for fatalErr != nil || currentParitition.RetryCounter < 3 { - currentParitition.RetryCounter++ + for { rowsRead, err := mssqlEx.Extract( ctx, tableInfo, @@ -215,33 +213,36 @@ func (mssqlEx *MssqlExtractor) ExtractWithRetries( indexPrimaryKey, chBatchesOut, ) + totalRowsRead += rowsRead - if rowsRead > 0 { - totalRowsRead += int64(rowsRead) + if err == nil { + return totalRowsRead, nil } - if err != nil { - var exError *custom_errors.ExtractorError - if errors.As(err, &exError) { - if exError.HasLastId { - currentParitition.ParentId = exError.Partition.Id - currentParitition.Id = uuid.New() - currentParitition.Range.Min = exError.LastId - currentParitition.Range.IsMinInclusive = false - } + var exError *custom_errors.ExtractorError + if errors.As(err, &exError) { + currentParitition.RetryCounter++ - time.Sleep(delay) - } else { - fatalErr = err + if currentParitition.RetryCounter > 3 { + return totalRowsRead, &custom_errors.JobError{ + Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), + Prev: err, + } } + if exError.HasLastId { + currentParitition.ParentId = exError.Partition.Id + currentParitition.Id = uuid.New() + currentParitition.Range.Min = exError.LastId + currentParitition.Range.IsMinInclusive = false + } + + time.Sleep(delay) continue } - break + return totalRowsRead, err } - - return totalRowsRead, fatalErr } func (mssqlEx *MssqlExtractor) Consume( @@ -294,6 +295,7 @@ func (mssqlEx *MssqlExtractor) Consume( indexPrimaryKey, chBatchesOut, ) + wgActivePartitions.Done() if rowsReadResult > 0 { atomic.AddInt64(rowsRead, int64(rowsReadResult)) @@ -317,8 +319,6 @@ func (mssqlEx *MssqlExtractor) Consume( continue } - - wgActivePartitions.Done() } } } diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index 6656422..2211a22 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" "sync" + "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" @@ -59,14 +60,14 @@ func (postgresEx *PostgresExtractor) Extract( partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, -) (int, error) { +) (int64, error) { query := buildExtractQueryPostgres(tableInfo, columns) if partition.HasRange { return 0, errors.New("Batch config not yet supported") } - rowsRead := 0 + var rowsRead int64 = 0 rows, err := postgresEx.db.Query(ctx, query) if err != nil { return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} @@ -110,6 +111,61 @@ func (postgresEx *PostgresExtractor) Extract( return rowsRead, nil } +func (postgresEx *PostgresExtractor) ExtractWithRetries( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + partition models.Partition, + indexPrimaryKey int, + chBatchesOut chan<- models.Batch, +) (int64, error) { + var totalRowsRead int64 + delay := time.Duration(time.Second * 1) + currentParitition := partition + + for { + rowsRead, err := postgresEx.Extract( + ctx, + tableInfo, + columns, + batchSize, + currentParitition, + indexPrimaryKey, + chBatchesOut, + ) + totalRowsRead += rowsRead + + if err == nil { + return totalRowsRead, nil + } + + var exError *custom_errors.ExtractorError + if errors.As(err, &exError) { + currentParitition.RetryCounter++ + + if currentParitition.RetryCounter > 3 { + return totalRowsRead, &custom_errors.JobError{ + Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), + Prev: err, + } + } + + if exError.HasLastId { + currentParitition.ParentId = exError.Partition.Id + currentParitition.Id = uuid.New() + currentParitition.Range.Min = exError.LastId + currentParitition.Range.IsMinInclusive = false + } + + time.Sleep(delay) + continue + } + + return totalRowsRead, err + } +} + func (postgresEx *PostgresExtractor) Consume( ctx context.Context, tableInfo config.SourceTableInfo, @@ -117,7 +173,6 @@ func (postgresEx *PostgresExtractor) Consume( batchSize int, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64,