diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go index f1c1bd5..95373dc 100644 --- a/internal/app/custom_errors/extractor.error.go +++ b/internal/app/custom_errors/extractor.error.go @@ -43,6 +43,7 @@ func ExtractorErrorHandler( } if err.Partition.RetryCounter >= maxRetryAttempts { + wgActivePartitions.Done() jobError := JobError{ ShouldCancelJob: false, Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, maxRetryAttempts), @@ -55,8 +56,19 @@ func ExtractorErrorHandler( return } - wgActivePartitions.Done() continue + } else { + jobError := JobError{ + ShouldCancelJob: false, + Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter), + Prev: &err, + } + + select { + case chJobErrorsOut <- jobError: + case <-ctx.Done(): + return + } } newPartition := err.Partition diff --git a/internal/app/custom_errors/job.error.go b/internal/app/custom_errors/job.error.go index ca359af..8326068 100644 --- a/internal/app/custom_errors/job.error.go +++ b/internal/app/custom_errors/job.error.go @@ -37,11 +37,11 @@ func JobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error { } if err.ShouldCancelJob { - log.Error(err.Msg, " - ", err.Prev) + log.Errorf("(Fatal job error) - %v - %v", err.Msg, err.Prev) return &err } - log.Error(err.Msg, " - ", err.Prev) + log.Errorf("%v - %v", err.Msg, err.Prev) } } } diff --git a/internal/app/custom_errors/loader.error.go b/internal/app/custom_errors/loader.error.go index a568285..aabe416 100644 --- a/internal/app/custom_errors/loader.error.go +++ b/internal/app/custom_errors/loader.error.go @@ -40,6 +40,7 @@ func LoaderErrorHandler( } if err.RetryCounter >= maxRetryAttempts { + wgActiveBatches.Done() jobError := JobError{ ShouldCancelJob: false, Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Id, maxRetryAttempts), @@ -52,8 +53,19 @@ func LoaderErrorHandler( return } - wgActiveBatches.Done() continue + } else { + jobError := JobError{ + ShouldCancelJob: false, + Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter), + Prev: &err, + } + + select { + case chJobErrorsOut <- jobError: + case <-ctx.Done(): + return + } } err.RetryCounter++ diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index 3535c85..40e9c0c 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -247,24 +247,28 @@ func (mssqlEx *MssqlExtractor) Exec( if err != nil { var exError *custom_errors.ExtractorError + var jobError *custom_errors.JobError if errors.As(err, &exError) { select { case <-ctx.Done(): return case chErrorsOut <- *exError: } - } - - var jobError *custom_errors.JobError - if errors.As(err, &jobError) { + } else if errors.As(err, &jobError) { select { case <-ctx.Done(): return case chJobErrorsOut <- *jobError: } + } else { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: + } } - return + continue } wgActivePartitions.Done() diff --git a/internal/app/etl/loaders/postgres.go b/internal/app/etl/loaders/postgres.go index fc6bc88..4560a7c 100644 --- a/internal/app/etl/loaders/postgres.go +++ b/internal/app/etl/loaders/postgres.go @@ -97,24 +97,28 @@ func (postgresLd *PostgresLoader) Exec( if err != nil { var ldError *custom_errors.LoaderError + var jobError *custom_errors.JobError if errors.As(err, &ldError) { select { case <-ctx.Done(): return case chErrorsOut <- *ldError: } - } - - var jobError *custom_errors.JobError - if errors.As(err, &jobError) { + } else if errors.As(err, &jobError) { select { case <-ctx.Done(): return case chJobErrorsOut <- *jobError: } + } else { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}: + } } - return + continue } wgActiveBatches.Done()