diff --git a/internal/app/etl/extractors/consumer.go b/internal/app/etl/extractors/consumer.go index 005083f..910a5f2 100644 --- a/internal/app/etl/extractors/consumer.go +++ b/internal/app/etl/extractors/consumer.go @@ -68,7 +68,7 @@ func Consume( wgActivePartitions.Done() if rowsReadResult > 0 { - atomic.AddInt64(rowsRead, int64(rowsReadResult)) + atomic.AddInt64(rowsRead, rowsReadResult) } if err != nil { @@ -86,8 +86,6 @@ func Consume( case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: } } - - continue } } } diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index e4522c0..f97558e 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -89,14 +89,9 @@ func (mssqlEx *MssqlExtractor) Exec( batchRows := make([]models.UnknownRowValues, 0, batchSize) var rowsRead int64 = 0 - rowValues := make([]any, len(columns)) - scanArgs := make([]any, len(columns)) - for i := range rowValues { - scanArgs[i] = &rowValues[i] - } - for rows.Next() { - if err := rows.Scan(scanArgs...); err != nil { + values, err := rows.Values() + if err != nil { if len(batchRows) == 0 { return rowsRead, err } @@ -110,7 +105,7 @@ func (mssqlEx *MssqlExtractor) Exec( } rowsRead++ - batchRows = append(batchRows, rowValues) + batchRows = append(batchRows, values) if len(batchRows) >= batchSize { if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil { return rowsRead, err diff --git a/internal/app/etl/transformers/mssql.go b/internal/app/etl/transformers/mssql.go index 7270ebb..8caa8e7 100644 --- a/internal/app/etl/transformers/mssql.go +++ b/internal/app/etl/transformers/mssql.go @@ -74,6 +74,10 @@ func (mssqlTr *MssqlTransformer) ProcessBatch( } } + if rowValues == nil { + continue + } + for _, task := range transformationPlan { val := rowValues[task.Index] if val == nil {