From 46597c4ffd8c1aba0dc6c307952d1f4f18f5668e Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Fri, 17 Apr 2026 00:33:49 -0500 Subject: [PATCH] refactor: implement extractor retry logic and streamline extractor interface --- cmd/go_migrate/process.go | 4 +- internal/app/etl/extractors/consumer.go | 94 ++++++++++++ .../etl/extractors/extract-with-retries.go | 70 +++++++++ internal/app/etl/extractors/mssql.go | 141 +----------------- internal/app/etl/extractors/postgres.go | 72 +-------- internal/app/etl/types.go | 24 +-- 6 files changed, 170 insertions(+), 235 deletions(-) create mode 100644 internal/app/etl/extractors/consumer.go create mode 100644 internal/app/etl/extractors/extract-with-retries.go diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 6058ff7..406efec 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -11,6 +11,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" log "github.com/sirupsen/logrus" @@ -119,8 +120,9 @@ func processMigrationJob( for range maxExtractors { wgExtractors.Go(func() { - extractor.Consume( + extractors.Consume( localCtx, + extractor, job.SourceTable, sourceColTypes, job.BatchSize, diff --git a/internal/app/etl/extractors/consumer.go b/internal/app/etl/extractors/consumer.go new file mode 100644 index 0000000..005083f --- /dev/null +++ b/internal/app/etl/extractors/consumer.go @@ -0,0 +1,94 @@ +package extractors + +import ( + "context" + "errors" + "slices" + "strings" + "sync" + "sync/atomic" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +func Consume( + ctx context.Context, + extractor etl.Extractor, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + chPartitionsIn <-chan models.Partition, + chBatchesOut chan<- models.Batch, + chErrorsOut chan<- custom_errors.JobError, + wgActivePartitions *sync.WaitGroup, + rowsRead *int64, +) { + indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { + return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) + }) + + if indexPrimaryKey == -1 { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.JobError{ + ShouldCancelJob: true, + Msg: "Primary key not found in provided columns", + }: + } + return + } + + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + case partition, ok := <-chPartitionsIn: + if !ok { + return + } + + rowsReadResult, err := extractWithRetries( + ctx, + extractor, + tableInfo, + columns, + batchSize, + partition, + indexPrimaryKey, + chBatchesOut, + ) + wgActivePartitions.Done() + + if rowsReadResult > 0 { + atomic.AddInt64(rowsRead, int64(rowsReadResult)) + } + + if err != nil { + var jobError *custom_errors.JobError + if errors.As(err, &jobError) { + select { + case <-ctx.Done(): + return + case chErrorsOut <- *jobError: + } + } else { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: + } + } + + continue + } + } + } +} diff --git a/internal/app/etl/extractors/extract-with-retries.go b/internal/app/etl/extractors/extract-with-retries.go new file mode 100644 index 0000000..154c4ac --- /dev/null +++ b/internal/app/etl/extractors/extract-with-retries.go @@ -0,0 +1,70 @@ +package extractors + +import ( + "context" + "errors" + "fmt" + "time" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" +) + +func extractWithRetries( + ctx context.Context, + extractor etl.Extractor, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + partition models.Partition, + indexPrimaryKey int, + chBatchesOut chan<- models.Batch, +) (int64, error) { + var totalRowsRead int64 + delay := time.Duration(time.Second * 1) + currentParitition := partition + + for { + rowsRead, err := extractor.Exec( + ctx, + tableInfo, + columns, + batchSize, + currentParitition, + indexPrimaryKey, + chBatchesOut, + ) + totalRowsRead += rowsRead + + if err == nil { + return totalRowsRead, nil + } + + var exError *custom_errors.ExtractorError + if errors.As(err, &exError) { + currentParitition.RetryCounter++ + + if currentParitition.RetryCounter > 3 { + return totalRowsRead, &custom_errors.JobError{ + Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), + Prev: err, + } + } + + if exError.HasLastId { + currentParitition.ParentId = exError.Partition.Id + currentParitition.Id = uuid.New() + currentParitition.Range.Min = exError.LastId + currentParitition.Range.IsMinInclusive = false + } + + time.Sleep(delay) + continue + } + + return totalRowsRead, err + } +} diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index 71c227f..e4522c0 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -3,20 +3,13 @@ package extractors import ( "context" "database/sql" - "errors" "fmt" - "slices" "strings" - "sync" - "sync/atomic" - "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" ) type MssqlExtractor struct { @@ -71,7 +64,7 @@ func buildExtractQueryMssql( return sbQuery.String() } -func (mssqlEx *MssqlExtractor) Extract( +func (mssqlEx *MssqlExtractor) Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -131,135 +124,3 @@ func (mssqlEx *MssqlExtractor) Extract( return rowsRead, rows.Err() } - -func (mssqlEx *MssqlExtractor) ExtractWithRetries( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - partition models.Partition, - indexPrimaryKey int, - chBatchesOut chan<- models.Batch, -) (int64, error) { - var totalRowsRead int64 - delay := time.Duration(time.Second * 1) - currentParitition := partition - - for { - rowsRead, err := mssqlEx.Extract( - ctx, - tableInfo, - columns, - batchSize, - currentParitition, - indexPrimaryKey, - chBatchesOut, - ) - totalRowsRead += rowsRead - - if err == nil { - return totalRowsRead, nil - } - - var exError *custom_errors.ExtractorError - if errors.As(err, &exError) { - currentParitition.RetryCounter++ - - if currentParitition.RetryCounter > 3 { - return totalRowsRead, &custom_errors.JobError{ - Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), - Prev: err, - } - } - - if exError.HasLastId { - currentParitition.ParentId = exError.Partition.Id - currentParitition.Id = uuid.New() - currentParitition.Range.Min = exError.LastId - currentParitition.Range.IsMinInclusive = false - } - - time.Sleep(delay) - continue - } - - return totalRowsRead, err - } -} - -func (mssqlEx *MssqlExtractor) Consume( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - chPartitionsIn <-chan models.Partition, - chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.JobError, - wgActivePartitions *sync.WaitGroup, - rowsRead *int64, -) { - indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { - return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) - }) - - if indexPrimaryKey == -1 { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.JobError{ - ShouldCancelJob: true, - Msg: "Primary key not found in provided columns", - }: - } - return - } - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - case partition, ok := <-chPartitionsIn: - if !ok { - return - } - - rowsReadResult, err := mssqlEx.ExtractWithRetries( - ctx, - tableInfo, - columns, - batchSize, - partition, - indexPrimaryKey, - chBatchesOut, - ) - wgActivePartitions.Done() - - if rowsReadResult > 0 { - atomic.AddInt64(rowsRead, int64(rowsReadResult)) - } - - if err != nil { - var jobError *custom_errors.JobError - if errors.As(err, &jobError) { - select { - case <-ctx.Done(): - return - case chErrorsOut <- *jobError: - } - } else { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: - } - } - - continue - } - } - } -} diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index 2211a22..8e23779 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "strings" - "sync" - "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" @@ -52,7 +50,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) } -func (postgresEx *PostgresExtractor) Extract( +func (postgresEx *PostgresExtractor) Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -110,71 +108,3 @@ func (postgresEx *PostgresExtractor) Extract( return rowsRead, nil } - -func (postgresEx *PostgresExtractor) ExtractWithRetries( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - partition models.Partition, - indexPrimaryKey int, - chBatchesOut chan<- models.Batch, -) (int64, error) { - var totalRowsRead int64 - delay := time.Duration(time.Second * 1) - currentParitition := partition - - for { - rowsRead, err := postgresEx.Extract( - ctx, - tableInfo, - columns, - batchSize, - currentParitition, - indexPrimaryKey, - chBatchesOut, - ) - totalRowsRead += rowsRead - - if err == nil { - return totalRowsRead, nil - } - - var exError *custom_errors.ExtractorError - if errors.As(err, &exError) { - currentParitition.RetryCounter++ - - if currentParitition.RetryCounter > 3 { - return totalRowsRead, &custom_errors.JobError{ - Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), - Prev: err, - } - } - - if exError.HasLastId { - currentParitition.ParentId = exError.Partition.Id - currentParitition.Id = uuid.New() - currentParitition.Range.Min = exError.LastId - currentParitition.Range.IsMinInclusive = false - } - - time.Sleep(delay) - continue - } - - return totalRowsRead, err - } -} - -func (postgresEx *PostgresExtractor) Consume( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - chPartitionsIn <-chan models.Partition, - chBatchesOut chan<- models.Batch, - chJobErrorsOut chan<- custom_errors.JobError, - wgActivePartitions *sync.WaitGroup, - rowsRead *int64, -) { -} diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index 2981bfc..ad8b058 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -10,7 +10,7 @@ import ( ) type Extractor interface { - Extract( + Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -19,28 +19,6 @@ type Extractor interface { indexPrimaryKey int, chBatchesOut chan<- models.Batch, ) (int64, error) - - ExtractWithRetries( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - partition models.Partition, - indexPrimaryKey int, - chBatchesOut chan<- models.Batch, - ) (int64, error) - - Consume( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - chPartitionsIn <-chan models.Partition, - chBatchesOut chan<- models.Batch, - chJobErrorsOut chan<- custom_errors.JobError, - wgActivePartitions *sync.WaitGroup, - rowsRead *int64, - ) } type TransformerFunc func(any) (any, error)