diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index 40e9c0c..fba5a8c 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -73,14 +73,14 @@ func buildExtractQueryMssql( func errorFromLastRow( lastRow models.UnknownRowValues, indexPrimaryKey int, - partition *models.Partition, + partition models.Partition, previousError error, ) *custom_errors.ExtractorError { lastIdRawValue := lastRow[indexPrimaryKey] lastId, ok := convert.ToInt64(lastIdRawValue) if !ok { - currentPartition := *partition + currentPartition := partition currentPartition.RetryCounter = 3 return &custom_errors.ExtractorError{ Partition: currentPartition, @@ -91,7 +91,7 @@ func errorFromLastRow( } return &custom_errors.ExtractorError{ - Partition: *partition, + Partition: partition, HasLastId: true, LastId: lastId, Msg: previousError.Error(), @@ -106,8 +106,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition( partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, - rowsRead *int64, -) error { +) (int, error) { query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive) var queryArgs []any @@ -118,9 +117,10 @@ func (mssqlEx *MssqlExtractor) ProcessPartition( ) } + rowsRead := 0 rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...) if err != nil { - return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } defer rows.Close() @@ -136,7 +136,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition( if err := rows.Scan(scanArgs...); err != nil { if len(batchRows) == 0 { - return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } lastRow := batchRows[len(batchRows)-1] @@ -144,52 +144,48 @@ func (mssqlEx *MssqlExtractor) ProcessPartition( select { case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): - return nil + return rowsRead, ctx.Err() } - atomic.AddInt64(rowsRead, int64(len(batchRows))) - - return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err) + return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) } + rowsRead++ batchRows = append(batchRows, rowValues) - if len(batchRows) >= batchSize { select { case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): - return nil + return rowsRead, ctx.Err() } - atomic.AddInt64(rowsRead, int64(len(batchRows))) batchRows = make([]models.UnknownRowValues, 0, batchSize) } + } if err := rows.Err(); err != nil { if errors.Is(err, ctx.Err()) { - return ctx.Err() + return rowsRead, ctx.Err() } - if len(batchRows) == 0 { - return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + if len(batchRows) > 0 { + lastRow := batchRows[len(batchRows)-1] + return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) } - lastRow := batchRows[len(batchRows)-1] - return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err) + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } if len(batchRows) > 0 { select { case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): - return nil + return rowsRead, ctx.Err() } - - atomic.AddInt64(rowsRead, int64(len(batchRows))) } - return nil + return rowsRead, nil } func (mssqlEx *MssqlExtractor) Exec( @@ -234,7 +230,7 @@ func (mssqlEx *MssqlExtractor) Exec( return } - err := mssqlEx.ProcessPartition( + rowsReadResult, err := mssqlEx.ProcessPartition( ctx, tableInfo, columns, @@ -242,9 +238,12 @@ func (mssqlEx *MssqlExtractor) Exec( partition, indexPrimaryKey, chBatchesOut, - rowsRead, ) + if rowsReadResult > 0 { + atomic.AddInt64(rowsRead, int64(rowsReadResult)) + } + if err != nil { var exError *custom_errors.ExtractorError var jobError *custom_errors.JobError diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index 6cd1d3a..e2b522c 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -6,7 +6,6 @@ import ( "fmt" "strings" "sync" - "sync/atomic" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" @@ -60,17 +59,17 @@ func (postgresEx *PostgresExtractor) ProcessPartition( partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, - rowsRead *int64, -) error { +) (int, error) { query := buildExtractQueryPostgres(tableInfo, columns) if partition.ShouldUseRange { - return errors.New("Batch config not yet supported") + return 0, errors.New("Batch config not yet supported") } + rowsRead := 0 rows, err := postgresEx.db.Query(ctx, query) if err != nil { - return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } defer rows.Close() @@ -79,8 +78,9 @@ func (postgresEx *PostgresExtractor) ProcessPartition( for rows.Next() { values, err := rows.Values() if err != nil { - return errors.New("Unexpected error reading rows from source") + return rowsRead, errors.New("Unexpected error reading rows from source") } + rowsRead++ batchRows = append(batchRows, values) @@ -88,29 +88,26 @@ func (postgresEx *PostgresExtractor) ProcessPartition( select { case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): - return nil + return rowsRead, ctx.Err() } - atomic.AddInt64(rowsRead, int64(len(batchRows))) batchRows = make([]models.UnknownRowValues, 0, batchSize) } } if err := rows.Err(); err != nil { - return errors.New("Unexpected error reading rows from source") + return rowsRead, errors.New("Unexpected error reading rows from source") } if len(batchRows) > 0 { select { case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): - return nil + return rowsRead, nil } - - atomic.AddInt64(rowsRead, int64(len(batchRows))) } - return nil + return rowsRead, nil } func (postgresEx *PostgresExtractor) Exec( diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index a2c07a5..e6c8ee5 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -18,8 +18,7 @@ type Extractor interface { partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, - rowsRead *int64, - ) error + ) (int, error) Exec( ctx context.Context,