diff --git a/internal/app/etl/extractors/process-with-retries.go b/internal/app/etl/extractors/process-with-retries.go index c04a988..d316dde 100644 --- a/internal/app/etl/extractors/process-with-retries.go +++ b/internal/app/etl/extractors/process-with-retries.go @@ -50,7 +50,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries( if currentParitition.RetryCounter >= retryConfig.Attempts { return totalRowsRead, &custom_errors.JobError{ - Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), + Msg: fmt.Sprintf("Partition %v reached max retries (%d)", currentParitition.Id, currentParitition.RetryCounter), Prev: err, } } diff --git a/internal/app/etl/loaders/process-with-retries.go b/internal/app/etl/loaders/process-with-retries.go index 55d8413..6869192 100644 --- a/internal/app/etl/loaders/process-with-retries.go +++ b/internal/app/etl/loaders/process-with-retries.go @@ -18,7 +18,6 @@ func (gl *GenericLoader) ProcessBatchWithRetries( retryConfig config.RetryConfig, batch models.Batch, ) (int64, error) { - retries := 0 for { rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) if err == nil { @@ -26,14 +25,12 @@ func (gl *GenericLoader) ProcessBatchWithRetries( } if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok { - retries++ - batch.RetryCounter = retries + batch.RetryCounter++ if batch.RetryCounter >= retryConfig.Attempts { return rowsLoaded, &custom_errors.JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", btError.Batch.Id, btError.Batch.RetryCounter), - Prev: btError, + Msg: fmt.Sprintf("Batch %v reached max retries (%d)", batch.Id, batch.RetryCounter), + Prev: btError, } }