From 6414943cf329f3933a80fc74ed542c3086cebdf7 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Mon, 27 Apr 2026 01:11:45 -0500 Subject: [PATCH] refactor: streamline error handling and processing in GenericExtractor; implement partition processing with retries --- cmd/go_migrate/process.go | 13 ---- internal/app/db-wrapper/mssql.go | 2 + internal/app/etl/extractors/consume.go | 29 +++----- internal/app/etl/extractors/main.go | 29 ++++++++ .../etl/extractors/process-with-retries.go | 69 +++++++++++++++++++ internal/app/etl/extractors/process.go | 61 +++++----------- 6 files changed, 129 insertions(+), 74 deletions(-) create mode 100644 internal/app/etl/extractors/process-with-retries.go diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 3d5a433..5983f00 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -111,7 +111,6 @@ func processMigrationJob( } chJobErrors := make(chan custom_errors.JobError, job.QueueSize) - chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) chPartitions := make(chan models.Partition, job.QueueSize) chBatchesRaw := make(chan models.Batch, job.QueueSize) @@ -131,15 +130,6 @@ func processMigrationJob( } }() - go custom_errors.ExtractorErrorHandler( - localCtx, - job.Retry, - job.MaxPartitionErrrors, - chExtractorErrors, - chPartitions, - chJobErrors, - &wgActivePartitions, - ) go custom_errors.LoaderErrorHandler( localCtx, job.Retry, @@ -162,7 +152,6 @@ func processMigrationJob( job.BatchSize, chPartitions, chBatchesRaw, - chExtractorErrors, chJobErrors, &wgActivePartitions, &rowsRead, @@ -216,8 +205,6 @@ func processMigrationJob( log.Debugf("wgActivePartitions is empty (%v)", job.Name) close(chPartitions) log.Debugf("chPartitions is closed (%v)", job.Name) - close(chExtractorErrors) - log.Debugf("chExtractorErrors is closed (%v)", job.Name) wgExtractors.Wait() log.Debugf("wgExtractors is empty (%v)", job.Name) diff --git a/internal/app/db-wrapper/mssql.go b/internal/app/db-wrapper/mssql.go index d8fca95..03c9284 100644 --- a/internal/app/db-wrapper/mssql.go +++ b/internal/app/db-wrapper/mssql.go @@ -233,6 +233,8 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery queryString := sbQuery.String() + // logrus.Debugf("Query: %s", queryString) + var queryArgs []any if q.LowerLimit.IsValid { diff --git a/internal/app/etl/extractors/consume.go b/internal/app/etl/extractors/consume.go index 672a9a6..f9b2dfd 100644 --- a/internal/app/etl/extractors/consume.go +++ b/internal/app/etl/extractors/consume.go @@ -11,6 +11,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/sirupsen/logrus" ) func (ex *GenericExtractor) Consume( @@ -20,8 +21,7 @@ func (ex *GenericExtractor) Consume( batchSize int, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.ExtractorError, - chJobErrorsOut chan<- custom_errors.JobError, + chErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64, ) { @@ -33,7 +33,7 @@ func (ex *GenericExtractor) Consume( select { case <-ctx.Done(): return - case chJobErrorsOut <- custom_errors.JobError{ + case chErrorsOut <- custom_errors.JobError{ ShouldCancelJob: true, Msg: "Primary key not found in provided columns", }: @@ -55,7 +55,7 @@ func (ex *GenericExtractor) Consume( return } - rowsReadResult, err := ex.ProcessPartition( + rowsReadResult, err := ex.ProcessPartitionWithRetries( ctx, tableInfo, columns, @@ -64,38 +64,29 @@ func (ex *GenericExtractor) Consume( indexPrimaryKey, chBatchesOut, ) + wgActivePartitions.Done() if rowsReadResult > 0 { - // current := atomic.LoadInt64(rowsRead) - // logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table) + current := atomic.LoadInt64(rowsRead) + logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table) atomic.AddInt64(rowsRead, int64(rowsReadResult)) } if err != nil { - if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { + if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { select { case <-ctx.Done(): return - case chErrorsOut <- *exError: - } - } else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { - select { - case <-ctx.Done(): - return - case chJobErrorsOut <- *jobError: + case chErrorsOut <- *jobError: } } else { select { case <-ctx.Done(): return - case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: } } - - continue } - - wgActivePartitions.Done() } } } diff --git a/internal/app/etl/extractors/main.go b/internal/app/etl/extractors/main.go index 22702bc..3081103 100644 --- a/internal/app/etl/extractors/main.go +++ b/internal/app/etl/extractors/main.go @@ -1,7 +1,11 @@ package extractors import ( + "context" + dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" ) type GenericExtractor struct { @@ -11,3 +15,28 @@ type GenericExtractor struct { func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor { return GenericExtractor{db: db} } + +func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error { + select { + case chBatchesOut <- batch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func flush( + ctx context.Context, + partition *models.Partition, + batchSize int, + batchRows []models.UnknownRowValues, + chBatchesOut chan<- models.Batch, +) error { + if len(batchRows) == 0 { + return nil + } + + batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows} + batchRows = make([]models.UnknownRowValues, 0, batchSize) + return sendBatch(ctx, chBatchesOut, batch) +} diff --git a/internal/app/etl/extractors/process-with-retries.go b/internal/app/etl/extractors/process-with-retries.go new file mode 100644 index 0000000..fb65b8d --- /dev/null +++ b/internal/app/etl/extractors/process-with-retries.go @@ -0,0 +1,69 @@ +package extractors + +import ( + "context" + "errors" + "fmt" + "time" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" + // "github.com/sirupsen/logrus" +) + +func (ex *GenericExtractor) ProcessPartitionWithRetries( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + partition models.Partition, + indexPrimaryKey int, + chBatchesOut chan<- models.Batch, +) (int64, error) { + var totalRowsRead int64 + delay := time.Duration(time.Second * 1) + currentParitition := partition + + for { + rowsRead, err := ex.ProcessPartition( + ctx, + tableInfo, + columns, + batchSize, + currentParitition, + indexPrimaryKey, + chBatchesOut, + ) + // logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table) + totalRowsRead += rowsRead + + if err == nil { + return totalRowsRead, nil + } + + if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { + currentParitition.RetryCounter++ + + if currentParitition.RetryCounter > 3 { + return totalRowsRead, &custom_errors.JobError{ + Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), + Prev: err, + } + } + + if exError.HasLastId { + currentParitition.ParentId = exError.Partition.Id + currentParitition.Id = uuid.New() + currentParitition.Range.Min = exError.LastId + currentParitition.Range.IsMinInclusive = false + } + + time.Sleep(delay) + continue + } + + return totalRowsRead, err + } +} diff --git a/internal/app/etl/extractors/process.go b/internal/app/etl/extractors/process.go index 2708908..d624551 100644 --- a/internal/app/etl/extractors/process.go +++ b/internal/app/etl/extractors/process.go @@ -2,7 +2,6 @@ package extractors import ( "context" - "errors" "fmt" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" @@ -10,10 +9,10 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" + // "github.com/sirupsen/logrus" ) -func errorFromLastRow( +func errorFromLastPartitionRow( lastRow models.UnknownRowValues, indexPrimaryKey int, partition models.Partition, @@ -48,8 +47,7 @@ func (ex *GenericExtractor) ProcessPartition( partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, -) (int, error) { - rowsRead := 0 +) (int64, error) { query := dbwrapper.ExtractionQuery{ Schema: tableInfo.Schema, Table: tableInfo.Table, @@ -67,15 +65,15 @@ func (ex *GenericExtractor) ProcessPartition( }, } - // logrus.Debugf("Querying with: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table) + // logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table) rows, err := ex.db.QueryFromObject(ctx, query) - if err != nil { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + return 0, err } defer rows.Close() batchRows := make([]models.UnknownRowValues, 0, batchSize) + var rowsRead int64 = 0 for rows.Next() { rowValues := make([]any, len(columns)) @@ -87,53 +85,32 @@ func (ex *GenericExtractor) ProcessPartition( if err := rows.Scan(scanArgs...); err != nil { if len(batchRows) == 0 { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + return rowsRead, err + } + + if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil { + return rowsRead, err } lastRow := batchRows[len(batchRows)-1] - - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - - return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) + return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err) } rowsRead++ batchRows = append(batchRows, rowValues) if len(batchRows) >= batchSize { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() + // logrus.Debugf("Batch size reached, flushing batch with %v rows (rowsRead=%v)", len(batchRows), rowsRead) + if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil { + // logrus.Warnf("Error flushing rows: %v", err) + return rowsRead, err } - batchRows = make([]models.UnknownRowValues, 0, batchSize) } } - if err := rows.Err(); err != nil { - if errors.Is(err, ctx.Err()) { - return rowsRead, ctx.Err() - } - - if len(batchRows) > 0 { - lastRow := batchRows[len(batchRows)-1] - return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) - } - - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil { + return rowsRead, err } - if len(batchRows) > 0 { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - } - - return rowsRead, nil + return rowsRead, rows.Err() }