From 9a00d6af045ad0c0d07f8506a518e2c0252050e9 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Sun, 26 Apr 2026 19:33:59 -0500 Subject: [PATCH] refactor: replace specific extractor implementations with a generic extractor; remove mssql and postgres extractor files --- cmd/go_migrate/main.go | 2 +- cmd/go_migrate/process.go | 5 +- internal/app/etl/extractors/main.go | 166 ++++++++++++++++++---- internal/app/etl/extractors/mssql.go | 180 ------------------------ internal/app/etl/extractors/postgres.go | 157 --------------------- 5 files changed, 138 insertions(+), 372 deletions(-) delete mode 100644 internal/app/etl/extractors/mssql.go delete mode 100644 internal/app/etl/extractors/postgres.go diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 65922aa..6b1c61e 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -118,7 +118,7 @@ func processMigrationJobs( sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) - extractor := extractors.NewMssqlExtractor(sourceDb) + extractor := extractors.NewExtractor(sourceDb) loader := loaders.NewGenericLoader(targetDb) var azureClient *azure.Client diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index b87a1f7..3d5a433 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -39,7 +39,7 @@ func processMigrationJob( targetDbWrapper dbwrapper.DbWrapper, sourceTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer, - extractor etl.Extractor, + extractor extractors.GenericExtractor, azureClient *azure.Client, loader etl.Loader, job config.Job, @@ -155,9 +155,8 @@ func processMigrationJob( for range maxExtractors { wgExtractors.Go(func() { - extractors.Consume( + extractor.Consume( localCtx, - extractor, job.SourceTable, sourceColTypes, job.BatchSize, diff --git a/internal/app/etl/extractors/main.go b/internal/app/etl/extractors/main.go index f0a58e3..4a7295c 100644 --- a/internal/app/etl/extractors/main.go +++ b/internal/app/etl/extractors/main.go @@ -12,13 +12,144 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" ) -func Consume( +type GenericExtractor struct { + db dbwrapper.DbWrapper +} + +func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor { + return GenericExtractor{db: db} +} + +func errorFromLastRow( + lastRow models.UnknownRowValues, + indexPrimaryKey int, + partition models.Partition, + previousError error, +) error { + lastIdRawValue := lastRow[indexPrimaryKey] + + lastId, ok := convert.ToInt64(lastIdRawValue) + if !ok { + currentPartition := partition + currentPartition.RetryCounter = 3 + return &custom_errors.ExtractorError{ + Partition: currentPartition, + HasLastId: true, + Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), + } + } + + return &custom_errors.ExtractorError{ + Partition: partition, + HasLastId: true, + LastId: lastId, + Msg: previousError.Error(), + } +} + +func (ex *GenericExtractor) ProcessPartition( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + partition models.Partition, + indexPrimaryKey int, + chBatchesOut chan<- models.Batch, +) (int, error) { + rowsRead := 0 + query := dbwrapper.ExtractionQuery{ + Schema: tableInfo.Schema, + Table: tableInfo.Table, + PrimaryKey: tableInfo.PrimaryKey, + LowerLimit: dbwrapper.ExtractorQueryLimit{ + IsValid: partition.HasRange && partition.Range.Min > 0, + IsInclusive: partition.Range.IsMinInclusive, + Value: partition.Range.Min, + }, + UpperLimit: dbwrapper.ExtractorQueryLimit{ + IsValid: partition.HasRange && partition.Range.Max > 0, + IsInclusive: partition.Range.IsMaxInclusive, + Value: partition.Range.Max, + }, + } + + rows, err := ex.db.QueryFromObject(ctx, query) + + if err != nil { + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + } + defer rows.Close() + + batchRows := make([]models.UnknownRowValues, 0, batchSize) + + for rows.Next() { + rowValues := make([]any, len(columns)) + scanArgs := make([]any, len(columns)) + + for i := range rowValues { + scanArgs[i] = &rowValues[i] + } + + if err := rows.Scan(scanArgs...); err != nil { + if len(batchRows) == 0 { + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + } + + lastRow := batchRows[len(batchRows)-1] + + select { + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: + case <-ctx.Done(): + return rowsRead, ctx.Err() + } + + return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) + } + rowsRead++ + + batchRows = append(batchRows, rowValues) + if len(batchRows) >= batchSize { + select { + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: + case <-ctx.Done(): + return rowsRead, ctx.Err() + } + + batchRows = make([]models.UnknownRowValues, 0, batchSize) + } + } + + if err := rows.Err(); err != nil { + if errors.Is(err, ctx.Err()) { + return rowsRead, ctx.Err() + } + + if len(batchRows) > 0 { + lastRow := batchRows[len(batchRows)-1] + return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) + } + + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + } + + if len(batchRows) > 0 { + select { + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: + case <-ctx.Done(): + return rowsRead, ctx.Err() + } + } + + return rowsRead, nil +} + +func (ex *GenericExtractor) Consume( ctx context.Context, - extractor etl.Extractor, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, @@ -59,7 +190,7 @@ func Consume( return } - rowsReadResult, err := extractor.ProcessPartition( + rowsReadResult, err := ex.ProcessPartition( ctx, tableInfo, columns, @@ -101,30 +232,3 @@ func Consume( } } } - -func errorFromLastRow( - lastRow models.UnknownRowValues, - indexPrimaryKey int, - partition models.Partition, - previousError error, -) error { - lastIdRawValue := lastRow[indexPrimaryKey] - - lastId, ok := convert.ToInt64(lastIdRawValue) - if !ok { - currentPartition := partition - currentPartition.RetryCounter = 3 - return &custom_errors.ExtractorError{ - Partition: currentPartition, - HasLastId: true, - Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), - } - } - - return &custom_errors.ExtractorError{ - Partition: partition, - HasLastId: true, - LastId: lastId, - Msg: previousError.Error(), - } -} diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go deleted file mode 100644 index dd455e1..0000000 --- a/internal/app/etl/extractors/mssql.go +++ /dev/null @@ -1,180 +0,0 @@ -package extractors - -import ( - "context" - "database/sql" - "errors" - "fmt" - "strings" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" - dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" -) - -type MssqlExtractor struct { - db dbwrapper.DbWrapper -} - -func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor { - return &MssqlExtractor{db: db} -} - -func buildExtractQueryMssql( - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - includeRange bool, - isMinInclusive bool, - isMaxInclusive bool, - hasMin bool, - hasMax bool, -) string { - var sbQuery strings.Builder - - sbQuery.WriteString("SELECT ") - - if len(columns) == 0 { - sbQuery.WriteString("*") - } else { - for i, col := range columns { - fmt.Fprintf(&sbQuery, "[%s]", col.Name()) - - if col.Type() == "GEOMETRY" { - fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name()) - } - - if i < len(columns)-1 { - sbQuery.WriteString(", ") - } - } - } - - fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table) - - if includeRange && (hasMin || hasMax) { - sbQuery.WriteString(" WHERE ") - - if hasMin { - fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey) - if isMinInclusive { - sbQuery.WriteString(" >=") - } else { - sbQuery.WriteString(" >") - } - sbQuery.WriteString(" @min") - } - - if hasMin && hasMax { - sbQuery.WriteString(" AND ") - } - - if hasMax { - fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey) - if isMaxInclusive { - sbQuery.WriteString(" <=") - } else { - sbQuery.WriteString(" <") - } - sbQuery.WriteString(" @max") - } - } - - fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey) - - return sbQuery.String() -} - -func (mssqlEx *MssqlExtractor) ProcessPartition( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - partition models.Partition, - indexPrimaryKey int, - chBatchesOut chan<- models.Batch, -) (int, error) { - hasMin := partition.HasRange && partition.Range.Min > 0 - hasMax := partition.HasRange && partition.Range.Max > 0 - query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax) - - var queryArgs []any - if hasMin { - queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min)) - } - if hasMax { - queryArgs = append(queryArgs, sql.Named("max", partition.Range.Max)) - } - - rowsRead := 0 - rows, err := mssqlEx.db.Query(ctx, query, queryArgs...) - if err != nil { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - defer rows.Close() - - batchRows := make([]models.UnknownRowValues, 0, batchSize) - - for rows.Next() { - rowValues := make([]any, len(columns)) - scanArgs := make([]any, len(columns)) - - for i := range rowValues { - scanArgs[i] = &rowValues[i] - } - - if err := rows.Scan(scanArgs...); err != nil { - if len(batchRows) == 0 { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - - lastRow := batchRows[len(batchRows)-1] - - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - - return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) - } - rowsRead++ - - batchRows = append(batchRows, rowValues) - if len(batchRows) >= batchSize { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - - batchRows = make([]models.UnknownRowValues, 0, batchSize) - } - - } - - if err := rows.Err(); err != nil { - if errors.Is(err, ctx.Err()) { - return rowsRead, ctx.Err() - } - - if len(batchRows) > 0 { - lastRow := batchRows[len(batchRows)-1] - return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) - } - - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - - if len(batchRows) > 0 { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - } - - return rowsRead, nil -} diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go deleted file mode 100644 index a45ba57..0000000 --- a/internal/app/etl/extractors/postgres.go +++ /dev/null @@ -1,157 +0,0 @@ -package extractors - -import ( - "context" - "fmt" - "strings" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" - dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" -) - -type PostgresExtractor struct { - db dbwrapper.DbWrapper -} - -func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor { - return &PostgresExtractor{db: db} -} - -func buildExtractQueryPostgres( - sourceDbInfo config.SourceTableInfo, - columns []models.ColumnType, - includeRange bool, - isMinInclusive bool, - isMaxInclusive bool, - hasMin bool, - hasMax bool, -) string { - var sbColumns strings.Builder - - if len(columns) == 0 { - sbColumns.WriteString("*") - } else { - for i, col := range columns { - if col.Type() == "GEOMETRY" { - sbColumns.WriteString(`ST_AsEWKB("`) - sbColumns.WriteString(col.Name()) - sbColumns.WriteString(`") AS "`) - sbColumns.WriteString(col.Name()) - sbColumns.WriteString(`"`) - } else { - sbColumns.WriteString(`"`) - sbColumns.WriteString(col.Name()) - sbColumns.WriteString(`"`) - } - - if i < len(columns)-1 { - sbColumns.WriteString(", ") - } - } - } - - query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table) - - if includeRange && (hasMin || hasMax) { - query += " WHERE " - paramIdx := 1 - - if hasMin { - query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey) - if isMinInclusive { - query += " >=" - } else { - query += " >" - } - query += fmt.Sprintf(" $%d", paramIdx) - paramIdx++ - } - - if hasMin && hasMax { - query += " AND " - } - - if hasMax { - query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey) - if isMaxInclusive { - query += " <=" - } else { - query += " <" - } - query += fmt.Sprintf(" $%d", paramIdx) - } - } - - query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey) - - return query -} - -func (postgresEx *PostgresExtractor) ProcessPartition( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - partition models.Partition, - indexPrimaryKey int, - chBatchesOut chan<- models.Batch, -) (int, error) { - hasMin := partition.HasRange && partition.Range.Min > 0 - hasMax := partition.HasRange && partition.Range.Max > 0 - query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax) - - var queryArgs []any - if hasMin { - queryArgs = append(queryArgs, partition.Range.Min) - } - if hasMax { - queryArgs = append(queryArgs, partition.Range.Max) - } - - rowsRead := 0 - rows, err := postgresEx.db.Query(ctx, query, queryArgs...) - if err != nil { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - defer rows.Close() - - batchRows := make([]models.UnknownRowValues, 0, batchSize) - - for rows.Next() { - values, err := rows.Values() - if err != nil { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - rowsRead++ - - batchRows = append(batchRows, values) - - if len(batchRows) >= batchSize { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - - batchRows = make([]models.UnknownRowValues, 0, batchSize) - } - } - - if err := rows.Err(); err != nil { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - - if len(batchRows) > 0 { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, nil - } - } - - return rowsRead, nil -}