From 80babf24f21277a472ef3b1d5a8d81dcf7833258 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Wed, 6 May 2026 18:56:50 -0500 Subject: [PATCH] refactor: implement Consume method in GenericLoader; enhance error handling in ProcessBatch --- internal/app/etl/loaders/consume.go | 63 +++++++++++++++++++++++++++++ internal/app/etl/loaders/main.go | 3 +- 2 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 internal/app/etl/loaders/consume.go diff --git a/internal/app/etl/loaders/consume.go b/internal/app/etl/loaders/consume.go new file mode 100644 index 0000000..374a15b --- /dev/null +++ b/internal/app/etl/loaders/consume.go @@ -0,0 +1,63 @@ +package loaders + +import ( + "context" + "errors" + "sync" + "sync/atomic" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +func (gl *GenericLoader) Consume( + ctx context.Context, + tableInfo config.TargetTableInfo, + columns []models.ColumnType, + retryConfig config.RetryConfig, + chBatchesIn <-chan models.Batch, + chErrorsOut chan<- custom_errors.JobError, + wgActiveBatches *sync.WaitGroup, + rowsLoaded *int64, +) { + colNames := mapSlice(columns, func(col models.ColumnType) string { + return col.Name() + }) + + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + case batch, ok := <-chBatchesIn: + if !ok { + return + } + + processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) + wgActiveBatches.Done() + + if err != nil { + if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { + select { + case <-ctx.Done(): + return + case chErrorsOut <- *jobError: + } + } else { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: + } + } + } else { + atomic.AddInt64(rowsLoaded, int64(processedRows)) + } + } + } +} diff --git a/internal/app/etl/loaders/main.go b/internal/app/etl/loaders/main.go index 523d6c8..39d06aa 100644 --- a/internal/app/etl/loaders/main.go +++ b/internal/app/etl/loaders/main.go @@ -38,8 +38,7 @@ func (gl *GenericLoader) ProcessBatch( ) if err != nil { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) { + if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok { if pgErr.Code == "23505" { return 0, &custom_errors.JobError{ ShouldCancelJob: true,