From 68220e4c414ac46727af3a6fef7f7a5f82a6738c Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Mon, 11 May 2026 01:32:21 -0500 Subject: [PATCH] refactor: implement loaderAccumulator for batch processing; streamline error handling in Consume --- internal/app/etl/loaders/consume.go | 143 ++++++++++++++-------------- 1 file changed, 71 insertions(+), 72 deletions(-) diff --git a/internal/app/etl/loaders/consume.go b/internal/app/etl/loaders/consume.go index e4dfdf4..05fa57f 100644 --- a/internal/app/etl/loaders/consume.go +++ b/internal/app/etl/loaders/consume.go @@ -13,6 +13,62 @@ import ( "github.com/sirupsen/logrus" ) +type loaderAccumulator struct { + batchSize int + rows []models.UnknownRowValues + parents []models.BatchRef + pendingDone int +} + +func (a *loaderAccumulator) add(batch models.Batch) { + a.rows = append(a.rows, batch.Rows...) + a.parents = append(a.parents, models.BatchRef{Id: batch.Id}) + a.pendingDone++ +} + +func (a *loaderAccumulator) ready() bool { + return len(a.rows) >= a.batchSize +} + +func (a *loaderAccumulator) drainPending(wg *sync.WaitGroup) { + for range a.pendingDone { + wg.Done() + } +} + +func sendLoadError( + ctx context.Context, + err error, + retryConfig config.RetryConfig, + failedBatchesCount *int32, + chErrorsOut chan<- custom_errors.JobError, +) bool { + atomic.AddInt32(failedBatchesCount, 1) + + var jobErr custom_errors.JobError + if je, ok := errors.AsType[*custom_errors.JobError](err); ok { + jobErr = *je + } else { + jobErr = custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err} + } + + select { + case <-ctx.Done(): + return false + case chErrorsOut <- jobErr: + } + + if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) { + select { + case <-ctx.Done(): + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}: + } + return false + } + + return true +} + func (gl *GenericLoader) Consume( ctx context.Context, tableInfo config.TargetTableInfo, @@ -29,58 +85,29 @@ func (gl *GenericLoader) Consume( return col.Name() }) - var accRows []models.UnknownRowValues - var parentBatches []models.BatchRef - pendingDone := 0 - - defer func() { - for range pendingDone { - wgActiveBatches.Done() - } - }() + acc := &loaderAccumulator{batchSize: batchSize} + defer acc.drainPending(wgActiveBatches) flush := func() bool { - if len(accRows) == 0 { + if len(acc.rows) == 0 { return true } - count := len(parentBatches) + count := len(acc.parents) superBatch := models.Batch{ Id: uuid.New(), - ParentBatches: parentBatches, - Rows: accRows, + ParentBatches: acc.parents, + Rows: acc.rows, } processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, superBatch) for range count { wgActiveBatches.Done() } - pendingDone -= count - accRows = nil - parentBatches = nil + acc.pendingDone -= count + acc.rows = nil + acc.parents = nil if err != nil { - atomic.AddInt32(failedBatchesCount, 1) - if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { - select { - case <-ctx.Done(): - return false - case chErrorsOut <- *jobError: - } - } else { - select { - case <-ctx.Done(): - return false - case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: - } - } - - if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) { - select { - case <-ctx.Done(): - case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}: - } - return false - } - return true + return sendLoadError(ctx, err, retryConfig, failedBatchesCount, chErrorsOut) } current := atomic.LoadInt64(rowsLoaded) @@ -90,13 +117,10 @@ func (gl *GenericLoader) Consume( } for { - if ctx.Err() != nil { - return - } - select { case <-ctx.Done(): return + case batch, ok := <-chBatchesIn: if !ok { flush() @@ -106,45 +130,20 @@ func (gl *GenericLoader) Consume( if batchSize <= 0 { processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch) wgActiveBatches.Done() - if err != nil { - atomic.AddInt32(failedBatchesCount, 1) - if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { - select { - case <-ctx.Done(): - return - case chErrorsOut <- *jobError: - } - } else { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: - } - } - - if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}: - return - } + if !sendLoadError(ctx, err, retryConfig, failedBatchesCount, chErrorsOut) { + return } continue } - current := atomic.LoadInt64(rowsLoaded) logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table) atomic.AddInt64(rowsLoaded, int64(processedRows)) continue } - pendingDone++ - accRows = append(accRows, batch.Rows...) - parentBatches = append(parentBatches, models.BatchRef{Id: batch.Id}) - - if len(accRows) >= batchSize { + acc.add(batch) + if acc.ready() { if !flush() { return }