diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 002d0b8..7e972c4 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -11,6 +11,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" log "github.com/sirupsen/logrus" @@ -129,8 +130,9 @@ func processMigrationJob( for range maxExtractors { wgExtractors.Go(func() { - extractor.Exec( + extractors.Consume( localCtx, + extractor, job.SourceTable, sourceColTypes, job.BatchSize, diff --git a/go.mod b/go.mod index 96170be..50f494e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module git.ksdemosapps.com/kylesoda/go-migrate -go 1.25.7 +go 1.26 require ( github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4 diff --git a/internal/app/etl/extractors/main.go b/internal/app/etl/extractors/main.go new file mode 100644 index 0000000..d3137ac --- /dev/null +++ b/internal/app/etl/extractors/main.go @@ -0,0 +1,101 @@ +package extractors + +import ( + "context" + "errors" + "slices" + "strings" + "sync" + "sync/atomic" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +func Consume( + ctx context.Context, + extractor etl.Extractor, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + chPartitionsIn <-chan models.Partition, + chBatchesOut chan<- models.Batch, + chErrorsOut chan<- custom_errors.ExtractorError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActivePartitions *sync.WaitGroup, + rowsRead *int64, +) { + indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { + return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) + }) + + if indexPrimaryKey == -1 { + select { + case <-ctx.Done(): + return + case chJobErrorsOut <- custom_errors.JobError{ + ShouldCancelJob: true, + Msg: "Primary key not found in provided columns", + }: + } + + return + } + + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + case partition, ok := <-chPartitionsIn: + if !ok { + return + } + + rowsReadResult, err := extractor.Exec( + ctx, + tableInfo, + columns, + batchSize, + partition, + indexPrimaryKey, + chBatchesOut, + ) + + if rowsReadResult > 0 { + atomic.AddInt64(rowsRead, int64(rowsReadResult)) + } + + if err != nil { + if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { + select { + case <-ctx.Done(): + return + case chErrorsOut <- *exError: + } + } else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { + select { + case <-ctx.Done(): + return + case chJobErrorsOut <- *jobError: + } + } else { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: + } + } + + continue + } + + wgActivePartitions.Done() + } + } +} diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index d34447a..bebc50a 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -5,10 +5,7 @@ import ( "database/sql" "errors" "fmt" - "slices" "strings" - "sync" - "sync/atomic" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" @@ -99,7 +96,7 @@ func errorFromLastRow( } } -func (mssqlEx *MssqlExtractor) ProcessPartition( +func (mssqlEx *MssqlExtractor) Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -188,90 +185,3 @@ func (mssqlEx *MssqlExtractor) ProcessPartition( return rowsRead, nil } - -func (mssqlEx *MssqlExtractor) Exec( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - chPartitionsIn <-chan models.Partition, - chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.ExtractorError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActivePartitions *sync.WaitGroup, - rowsRead *int64, -) { - indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { - return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) - }) - - if indexPrimaryKey == -1 { - select { - case <-ctx.Done(): - return - case chJobErrorsOut <- custom_errors.JobError{ - ShouldCancelJob: true, - Msg: "Primary key not found in provided columns", - }: - } - - return - } - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - case partition, ok := <-chPartitionsIn: - if !ok { - return - } - - rowsReadResult, err := mssqlEx.ProcessPartition( - ctx, - tableInfo, - columns, - batchSize, - partition, - indexPrimaryKey, - chBatchesOut, - ) - - if rowsReadResult > 0 { - atomic.AddInt64(rowsRead, int64(rowsReadResult)) - } - - if err != nil { - var exError *custom_errors.ExtractorError - var jobError *custom_errors.JobError - if errors.As(err, &exError) { - select { - case <-ctx.Done(): - return - case chErrorsOut <- *exError: - } - } else if errors.As(err, &jobError) { - select { - case <-ctx.Done(): - return - case chJobErrorsOut <- *jobError: - } - } else { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: - } - } - - continue - } - - wgActivePartitions.Done() - } - } -} diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index 8d494d3..71b826d 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "strings" - "sync" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" @@ -51,7 +50,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) } -func (postgresEx *PostgresExtractor) ProcessPartition( +func (postgresEx *PostgresExtractor) Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -109,17 +108,3 @@ func (postgresEx *PostgresExtractor) ProcessPartition( return rowsRead, nil } - -func (postgresEx *PostgresExtractor) Exec( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - chPartitionsIn <-chan models.Partition, - chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.ExtractorError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActivePartitions *sync.WaitGroup, - rowsRead *int64, -) { -} diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index e6c8ee5..f56f8d1 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -10,7 +10,7 @@ import ( ) type Extractor interface { - ProcessPartition( + Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, @@ -19,19 +19,6 @@ type Extractor interface { indexPrimaryKey int, chBatchesOut chan<- models.Batch, ) (int, error) - - Exec( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - chPartitionsIn <-chan models.Partition, - chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.ExtractorError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActivePartitions *sync.WaitGroup, - rowsRead *int64, - ) } type TransformerFunc func(any) (any, error)