diff --git a/internal/app/etl/extractors/process.go b/internal/app/etl/extractors/process.go index 7adc01d..24426b8 100644 --- a/internal/app/etl/extractors/process.go +++ b/internal/app/etl/extractors/process.go @@ -76,6 +76,7 @@ func (ex *GenericExtractor) ProcessPartition( batchRows := make([]models.UnknownRowValues, 0, batchSize) var rowsRead int64 = 0 + var lastRow models.UnknownRowValues for rows.Next() { rowValues := make([]any, len(columns)) @@ -98,6 +99,7 @@ func (ex *GenericExtractor) ProcessPartition( return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err) } rowsRead++ + lastRow = rowValues batchRows = append(batchRows, rowValues) if len(batchRows) >= batchSize { @@ -114,5 +116,12 @@ func (ex *GenericExtractor) ProcessPartition( return rowsRead, err } - return rowsRead, rows.Err() + if err := rows.Err(); err != nil { + if lastRow != nil { + return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err) + } + return rowsRead, err + } + + return rowsRead, nil }