diff --git a/internal/app/etl/extractors/main.go b/internal/app/etl/extractors/main.go index d3137ac..f0a58e3 100644 --- a/internal/app/etl/extractors/main.go +++ b/internal/app/etl/extractors/main.go @@ -3,12 +3,14 @@ package extractors import ( "context" "errors" + "fmt" "slices" "strings" "sync" "sync/atomic" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" @@ -57,7 +59,7 @@ func Consume( return } - rowsReadResult, err := extractor.Exec( + rowsReadResult, err := extractor.ProcessPartition( ctx, tableInfo, columns, @@ -99,3 +101,30 @@ func Consume( } } } + +func errorFromLastRow( + lastRow models.UnknownRowValues, + indexPrimaryKey int, + partition models.Partition, + previousError error, +) error { + lastIdRawValue := lastRow[indexPrimaryKey] + + lastId, ok := convert.ToInt64(lastIdRawValue) + if !ok { + currentPartition := partition + currentPartition.RetryCounter = 3 + return &custom_errors.ExtractorError{ + Partition: currentPartition, + HasLastId: true, + Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), + } + } + + return &custom_errors.ExtractorError{ + Partition: partition, + HasLastId: true, + LastId: lastId, + Msg: previousError.Error(), + } +} diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index e2b1300..dd455e1 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -8,7 +8,6 @@ import ( "strings" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" @@ -88,35 +87,7 @@ func buildExtractQueryMssql( return sbQuery.String() } -func errorFromLastRow( - lastRow models.UnknownRowValues, - indexPrimaryKey int, - partition models.Partition, - previousError error, -) *custom_errors.ExtractorError { - lastIdRawValue := lastRow[indexPrimaryKey] - - lastId, ok := convert.ToInt64(lastIdRawValue) - if !ok { - currentPartition := partition - currentPartition.RetryCounter = 3 - return &custom_errors.ExtractorError{ - Partition: currentPartition, - HasLastId: true, - Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), - } - - } - - return &custom_errors.ExtractorError{ - Partition: partition, - HasLastId: true, - LastId: lastId, - Msg: previousError.Error(), - } -} - -func (mssqlEx *MssqlExtractor) Exec( +func (mssqlEx *MssqlExtractor) ProcessPartition( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index 964f940..a45ba57 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -91,7 +91,7 @@ func buildExtractQueryPostgres( return query } -func (postgresEx *PostgresExtractor) Exec( +func (postgresEx *PostgresExtractor) ProcessPartition( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index f56f8d1..b2eb87b 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -10,7 +10,7 @@ import ( ) type Extractor interface { - Exec( + ProcessPartition( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType,