fix: enhance error handling in extractor and loader processes; ensure proper job error propagation and logging

This commit is contained in:
2026-04-12 20:11:19 -05:00
parent 01780b4b02
commit 5633dc98d0
5 changed files with 46 additions and 14 deletions

View File

@@ -43,6 +43,7 @@ func ExtractorErrorHandler(
} }
if err.Partition.RetryCounter >= maxRetryAttempts { if err.Partition.RetryCounter >= maxRetryAttempts {
wgActivePartitions.Done()
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, maxRetryAttempts), Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, maxRetryAttempts),
@@ -55,8 +56,19 @@ func ExtractorErrorHandler(
return return
} }
wgActivePartitions.Done()
continue continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
} }
newPartition := err.Partition newPartition := err.Partition

View File

@@ -37,11 +37,11 @@ func JobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
} }
if err.ShouldCancelJob { if err.ShouldCancelJob {
log.Error(err.Msg, " - ", err.Prev) log.Errorf("(Fatal job error) - %v - %v", err.Msg, err.Prev)
return &err return &err
} }
log.Error(err.Msg, " - ", err.Prev) log.Errorf("%v - %v", err.Msg, err.Prev)
} }
} }
} }

View File

@@ -40,6 +40,7 @@ func LoaderErrorHandler(
} }
if err.RetryCounter >= maxRetryAttempts { if err.RetryCounter >= maxRetryAttempts {
wgActiveBatches.Done()
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Id, maxRetryAttempts), Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
@@ -52,8 +53,19 @@ func LoaderErrorHandler(
return return
} }
wgActiveBatches.Done()
continue continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
} }
err.RetryCounter++ err.RetryCounter++

View File

@@ -247,24 +247,28 @@ func (mssqlEx *MssqlExtractor) Exec(
if err != nil { if err != nil {
var exError *custom_errors.ExtractorError var exError *custom_errors.ExtractorError
var jobError *custom_errors.JobError
if errors.As(err, &exError) { if errors.As(err, &exError) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case chErrorsOut <- *exError: case chErrorsOut <- *exError:
} }
} } else if errors.As(err, &jobError) {
var jobError *custom_errors.JobError
if errors.As(err, &jobError) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case chJobErrorsOut <- *jobError: case chJobErrorsOut <- *jobError:
} }
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
} }
return continue
} }
wgActivePartitions.Done() wgActivePartitions.Done()

View File

@@ -97,24 +97,28 @@ func (postgresLd *PostgresLoader) Exec(
if err != nil { if err != nil {
var ldError *custom_errors.LoaderError var ldError *custom_errors.LoaderError
var jobError *custom_errors.JobError
if errors.As(err, &ldError) { if errors.As(err, &ldError) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case chErrorsOut <- *ldError: case chErrorsOut <- *ldError:
} }
} } else if errors.As(err, &jobError) {
var jobError *custom_errors.JobError
if errors.As(err, &jobError) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case chJobErrorsOut <- *jobError: case chJobErrorsOut <- *jobError:
} }
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}:
}
} }
return continue
} }
wgActiveBatches.Done() wgActiveBatches.Done()