From 6f2e3e28f1555a993cfbe1deea8aaf96516e09e6 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Mon, 11 May 2026 10:02:58 -0500 Subject: [PATCH] refactor: enhance error handling in ProcessPartition by tracking last row values --- internal/app/etl/extractors/process.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/app/etl/extractors/process.go b/internal/app/etl/extractors/process.go index 7adc01d..24426b8 100644 --- a/internal/app/etl/extractors/process.go +++ b/internal/app/etl/extractors/process.go @@ -76,6 +76,7 @@ func (ex *GenericExtractor) ProcessPartition( batchRows := make([]models.UnknownRowValues, 0, batchSize) var rowsRead int64 = 0 + var lastRow models.UnknownRowValues for rows.Next() { rowValues := make([]any, len(columns)) @@ -98,6 +99,7 @@ func (ex *GenericExtractor) ProcessPartition( return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err) } rowsRead++ + lastRow = rowValues batchRows = append(batchRows, rowValues) if len(batchRows) >= batchSize { @@ -114,5 +116,12 @@ func (ex *GenericExtractor) ProcessPartition( return rowsRead, err } - return rowsRead, rows.Err() + if err := rows.Err(); err != nil { + if lastRow != nil { + return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err) + } + return rowsRead, err + } + + return rowsRead, nil }