From d228a048b8d0e5e358ad09b780f02c92b3185ba6 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Fri, 10 Apr 2026 19:29:07 -0500 Subject: [PATCH] feat: update extractor error handling to use models.UnknownRowValues for improved type consistency --- cmd/go_migrate/extractor-error-handler.go | 3 +- cmd/go_migrate/extractor.go | 251 ---------------------- cmd/go_migrate/loader.go | 2 +- cmd/go_migrate/process.go | 2 +- 4 files changed, 4 insertions(+), 254 deletions(-) delete mode 100644 cmd/go_migrate/extractor.go diff --git a/cmd/go_migrate/extractor-error-handler.go b/cmd/go_migrate/extractor-error-handler.go index 7f8e7fd..86cffa4 100644 --- a/cmd/go_migrate/extractor-error-handler.go +++ b/cmd/go_migrate/extractor-error-handler.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) @@ -78,7 +79,7 @@ func extractorErrorHandler( } } -func ExtractorErrorFromLastRowMssql(lastRow UnknownRowValues, indexPrimaryKey int, batch *Batch, previousError error) ExtractorError { +func ExtractorErrorFromLastRowMssql(lastRow models.UnknownRowValues, indexPrimaryKey int, batch *Batch, previousError error) ExtractorError { lastIdRawValue := lastRow[indexPrimaryKey] lastId, ok := ToInt64(lastIdRawValue) diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go deleted file mode 100644 index cb2464d..0000000 --- a/cmd/go_migrate/extractor.go +++ /dev/null @@ -1,251 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "errors" - "slices" - "strings" - "sync" - "sync/atomic" - "time" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "github.com/google/uuid" - "github.com/jackc/pgx/v5/pgxpool" - _ "github.com/microsoft/go-mssqldb" - log "github.com/sirupsen/logrus" -) - -type UnknownRowValues = []any - -type Chunk struct { - Id uuid.UUID - BatchId uuid.UUID - Data []UnknownRowValues - RetryCounter int -} - -func extractFromMssql( - ctx context.Context, - db *sql.DB, - tableInfo config.SourceTableInfo, - columns []ColumnType, - chunkSize int, - chBatchesIn <-chan Batch, - chChunksOut chan<- Chunk, - chErrorsOut chan<- ExtractorError, - chJobErrorsOut chan<- JobError, - wgActiveBatches *sync.WaitGroup, - rowsRead *int64, -) { - indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool { - return strings.EqualFold(col.name, tableInfo.PrimaryKey) - }) - - if indexPrimaryKey == -1 { - jobError := JobError{ - ShouldCancelJob: true, - Msg: "Primary key not found in provided columns", - } - - select { - case <-ctx.Done(): - return - case chJobErrorsOut <- jobError: - } - - return - } - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - case batch, ok := <-chBatchesIn: - if !ok { - return - } - - if abort := processBatch(ctx, db, tableInfo, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches, rowsRead); abort { - return - } - } - } -} - -func processBatch( - ctx context.Context, - db *sql.DB, - tableInfo config.SourceTableInfo, - columns []ColumnType, - chunkSize int, - batch Batch, - indexPrimaryKey int, - chChunksOut chan<- Chunk, - chErrorsOut chan<- ExtractorError, - wgActiveBatches *sync.WaitGroup, - rowsRead *int64, -) (abort bool) { - query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) - log.Debug("Query used to extract data from mssql: ", query) - - var queryArgs []any - if batch.ShouldUseRange { - queryArgs = append(queryArgs, - sql.Named("min", batch.LowerLimit), - sql.Named("max", batch.UpperLimit), - ) - } - - queryStartTime := time.Now() - rows, err := db.QueryContext(ctx, query, queryArgs...) - if err != nil { - select { - case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}: - case <-ctx.Done(): - return true - } - return false - } - defer rows.Close() - log.Debugf("Query executed in %v", time.Since(queryStartTime)) - - rowsChunk := make([]UnknownRowValues, 0, chunkSize) - totalRowsExtracted := 0 - chunkStartTime := time.Now() - - for rows.Next() { - values := make([]any, len(columns)) - scanArgs := make([]any, len(columns)) - - for i := range values { - scanArgs[i] = &values[i] - } - - if err := rows.Scan(scanArgs...); err != nil { - if len(rowsChunk) == 0 { - select { - case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}: - case <-ctx.Done(): - return true - } - return false - } - - lastRow := rowsChunk[len(rowsChunk)-1] - select { - case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err): - case <-ctx.Done(): - return true - } - - select { - case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: - case <-ctx.Done(): - return true - } - - atomic.AddInt64(rowsRead, int64(len(rowsChunk))) - - return false - } - - rowsChunk = append(rowsChunk, values) - totalRowsExtracted++ - - if len(rowsChunk) >= chunkSize { - chunkDuration := time.Since(chunkStartTime) - rowsPerSec := float64(chunkSize) / chunkDuration.Seconds() - log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted) - - select { - case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: - case <-ctx.Done(): - return true - } - - atomic.AddInt64(rowsRead, int64(len(rowsChunk))) - rowsChunk = make([]UnknownRowValues, 0, chunkSize) - chunkStartTime = time.Now() - } - } - - if err := rows.Err(); err != nil { - if errors.Is(err, ctx.Err()) { - return true - } - - if len(rowsChunk) == 0 { - select { - case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}: - case <-ctx.Done(): - return true - } - return false - } - - lastRow := rowsChunk[len(rowsChunk)-1] - select { - case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err): - case <-ctx.Done(): - return true - } - return false - } - - if len(rowsChunk) > 0 { - chunkDuration := time.Since(chunkStartTime) - rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds() - log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted) - select { - case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: - case <-ctx.Done(): - return true - } - - atomic.AddInt64(rowsRead, int64(len(rowsChunk))) - } - - wgActiveBatches.Done() - return false -} - -func extractFromPostgres(ctx context.Context, tableInfo config.SourceTableInfo, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error { - query := buildExtractQueryPostgres(tableInfo, columns) - log.Debug("Query used to extract data from postgres: ", query) - - rows, err := db.Query(ctx, query) - if err != nil { - return err - } - defer rows.Close() - - rowsChunk := make([]UnknownRowValues, 0, chunkSize) - - for rows.Next() { - values, err := rows.Values() - if err != nil { - return err - } - - rowsChunk = append(rowsChunk, values) - - if len(rowsChunk) >= chunkSize { - out <- rowsChunk - rowsChunk = make([]UnknownRowValues, 0, chunkSize) - log.Infof("Chunk send... %+v", tableInfo) - } - } - - if len(rowsChunk) > 0 { - out <- rowsChunk - log.Infof("Chunk send... %+v", tableInfo) - } - - return nil -} diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index 1ded1fe..70faa5e 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -109,7 +109,7 @@ func loadChunkPostgres( return false } -func loadRowsMssql(ctx context.Context, tableInfo config.TargetTableInfo, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error { +func loadRowsMssql(ctx context.Context, tableInfo config.TargetTableInfo, columns []ColumnType, db *sql.DB, in <-chan []models.UnknownRowValues) error { chunkCount := 0 totalRowsLoaded := 0 diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index bb32c0f..d3ee4a5 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -160,7 +160,7 @@ func logSampleRow( schema string, table string, columns []ColumnType, - rowValues UnknownRowValues, + rowValues models.UnknownRowValues, tag string, ) { log.Infof("[%s.%s] Sample row: (%s)", schema, table, tag)