From 1c3db39b21d4e1519a75f56b246ee42ec7854722 Mon Sep 17 00:00:00 2001 From: DiegoAlessandroMotta <153887376+DiegoAlessandroMotta@users.noreply.github.com> Date: Thu, 16 Apr 2026 23:49:23 -0500 Subject: [PATCH] update extractor interface --- cmd/go_migrate/process.go | 2 +- internal/app/etl/extractors/mssql.go | 77 ++++++++++++++++++++----- internal/app/etl/extractors/postgres.go | 4 +- internal/app/etl/types.go | 17 ++++-- 4 files changed, 78 insertions(+), 22 deletions(-) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index e525299..16ae071 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -129,7 +129,7 @@ func processMigrationJob( for range maxExtractors { wgExtractors.Go(func() { - extractor.Exec( + extractor.Consume( localCtx, job.SourceTable, sourceColTypes, diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index d34447a..d871f1a 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" @@ -99,7 +100,7 @@ func errorFromLastRow( } } -func (mssqlEx *MssqlExtractor) ProcessPartition( +func (mssqlEx *MssqlExtractor) Extract( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -107,7 +108,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition( partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, -) (int, error) { +) (int64, error) { query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive) var queryArgs []any @@ -118,7 +119,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition( ) } - rowsRead := 0 + var rowsRead int64 = 0 rows, err := mssqlEx.db.Query(ctx, query, queryArgs...) if err != nil { return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} @@ -189,14 +190,67 @@ func (mssqlEx *MssqlExtractor) ProcessPartition( return rowsRead, nil } -func (mssqlEx *MssqlExtractor) Exec( +func (mssqlEx *MssqlExtractor) ExtractWithRetries( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + partition models.Partition, + indexPrimaryKey int, + chBatchesOut chan<- models.Batch, +) (int64, error) { + var totalRowsRead int64 + var fatalErr error + delay := time.Duration(time.Second * 1) + currentParitition := partition + + for fatalErr != nil || currentParitition.RetryCounter < 3 { + currentParitition.RetryCounter++ + rowsRead, err := mssqlEx.Extract( + ctx, + tableInfo, + columns, + batchSize, + currentParitition, + indexPrimaryKey, + chBatchesOut, + ) + + if rowsRead > 0 { + totalRowsRead += int64(rowsRead) + } + + if err != nil { + var exError *custom_errors.ExtractorError + if errors.As(err, &exError) { + if exError.HasLastId { + currentParitition.ParentId = exError.Partition.Id + currentParitition.Id = uuid.New() + currentParitition.Range.Min = exError.LastId + currentParitition.Range.IsMinInclusive = false + } + + time.Sleep(delay) + } else { + fatalErr = err + } + + continue + } + + break + } + + return totalRowsRead, fatalErr +} + +func (mssqlEx *MssqlExtractor) Consume( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64, @@ -231,7 +285,7 @@ func (mssqlEx *MssqlExtractor) Exec( return } - rowsReadResult, err := mssqlEx.ProcessPartition( + rowsReadResult, err := mssqlEx.ExtractWithRetries( ctx, tableInfo, columns, @@ -246,15 +300,8 @@ func (mssqlEx *MssqlExtractor) Exec( } if err != nil { - var exError *custom_errors.ExtractorError var jobError *custom_errors.JobError - if errors.As(err, &exError) { - select { - case <-ctx.Done(): - return - case chErrorsOut <- *exError: - } - } else if errors.As(err, &jobError) { + if errors.As(err, &jobError) { select { case <-ctx.Done(): return @@ -264,7 +311,7 @@ func (mssqlEx *MssqlExtractor) Exec( select { case <-ctx.Done(): return - case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: + case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: } } diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index 8d494d3..6656422 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -51,7 +51,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) } -func (postgresEx *PostgresExtractor) ProcessPartition( +func (postgresEx *PostgresExtractor) Extract( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -110,7 +110,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition( return rowsRead, nil } -func (postgresEx *PostgresExtractor) Exec( +func (postgresEx *PostgresExtractor) Consume( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index e6c8ee5..2981bfc 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -10,7 +10,7 @@ import ( ) type Extractor interface { - ProcessPartition( + Extract( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -18,16 +18,25 @@ type Extractor interface { partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, - ) (int, error) + ) (int64, error) - Exec( + ExtractWithRetries( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + partition models.Partition, + indexPrimaryKey int, + chBatchesOut chan<- models.Batch, + ) (int64, error) + + Consume( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64,