refactor: remove unused error channels and enhance job configuration; add max failed batches load parameter

This commit is contained in:
2026-05-08 22:28:31 -05:00
parent c4e233401b
commit 212d3663e2
4 changed files with 54 additions and 59 deletions

View File

@@ -114,7 +114,6 @@ func processMigrationJob(
} }
chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize) chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
chPartitions := make(chan models.Partition, job.ExtractorQueueSize) chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize) chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize) chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
@@ -133,16 +132,6 @@ func processMigrationJob(
} }
}() }()
go custom_errors.LoaderErrorHandler(
localCtx,
job.Retry,
job.MaxExtractorBatchErrors,
chLoadersErrors,
chBatchesTransformed,
chJobErrors,
&wgActiveBatches,
)
maxExtractors := min(job.MaxExtractors, len(partitions)) maxExtractors := min(job.MaxExtractors, len(partitions))
log.Infof("Starting %d extractor(s)...", maxExtractors) log.Infof("Starting %d extractor(s)...", maxExtractors)
@@ -223,8 +212,6 @@ func processMigrationJob(
log.Debugf("wgActiveBatches is empty (%v)", job.Name) log.Debugf("wgActiveBatches is empty (%v)", job.Name)
close(chBatchesTransformed) close(chBatchesTransformed)
log.Debugf("chBatchesTransformed is empty (%v)", job.Name) log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
close(chLoadersErrors)
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
wgLoaders.Wait() wgLoaders.Wait()
log.Debugf("wgLoaders is empty (%v)", job.Name) log.Debugf("wgLoaders is empty (%v)", job.Name)

View File

@@ -12,10 +12,11 @@ defaults:
transformer_queue_size: 8 transformer_queue_size: 8
max_loaders: 4 max_loaders: 4
loader_batch_size: 25000 loader_batch_size: 25000
max_failed_partitions: 5
max_failed_batches_transform: 5
max_failed_batches_load: 5
truncate_target: true truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_extractor_batch_errors: 5
retry: retry:
attempts: 3 attempts: 3
base_delay_ms: 500 base_delay_ms: 500
@@ -34,11 +35,11 @@ jobs:
table: MANZANA table: MANZANA
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
range: # range:
min: 1000000 # min: 1000000
max: 2000000 # max: 2000000
is_min_inclusive: false # is_min_inclusive: false
is_max_inclusive: true # is_max_inclusive: true
- name: red_puerto - name: red_puerto
enabled: true enabled: true
@@ -57,25 +58,29 @@ jobs:
post_sql: post_sql:
- "SELECT 1" - "SELECT 1"
- name: infraestructura_site_holder__attach # - name: infraestructura_site_holder__attach
source: # source:
schema: Infraestructura # schema: Infraestructura
table: SITE_HOLDER__ATTACH # table: SITE_HOLDER__ATTACH
primary_key: GDB_ARCHIVE_OID # primary_key: GDB_ARCHIVE_OID
target: # target:
schema: Infraestructura # schema: Infraestructura
table: SITE_HOLDER__ATTACH # table: SITE_HOLDER__ATTACH
to_storage: # to_storage:
columns: # columns:
- source: DATA # - source: DATA
target: FILE_URL # target: FILE_URL
mode: REFERENCE_ONLY # mode: REFERENCE_ONLY
max_extractors: 8 # batches_per_partition: 10000
max_loaders: 4 # max_extractors: 8
queue_size: 32 # extractor_queue_size: 32
batch_size: 1 # extractor_batch_size: 1
retry: # max_transformers: 16
attempts: 5 # transformer_batch_size: 20000
base_delay_ms: 1000 # transformer_queue_size: 8
max_delay_ms: 15000 # max_loaders: 4
max_jitter_ms: 500 # retry:
# attempts: 5
# base_delay_ms: 1000
# max_delay_ms: 15000
# max_jitter_ms: 500

View File

@@ -34,10 +34,11 @@ type JobConfig struct {
TransformerQueueSize int `yaml:"transformer_queue_size"` TransformerQueueSize int `yaml:"transformer_queue_size"`
MaxLoaders int `yaml:"max_loaders"` MaxLoaders int `yaml:"max_loaders"`
LoaderBatchSize int `yaml:"loader_batch_size"` LoaderBatchSize int `yaml:"loader_batch_size"`
MaxFailedPartitions int `yaml:"max_failed_partitions"`
MaxFailedBatchesTransform int `yaml:"max_failed_batches_transform"`
MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"`
TruncateTarget bool `yaml:"truncate_target"` TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"` TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"`
Retry RetryConfig `yaml:"retry"` Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64 RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"` ToStorage ToStorageConfig `yaml:"to_storage"`

View File

@@ -18,6 +18,7 @@ func (gl *GenericLoader) ProcessBatchWithRetries(
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
batch models.Batch, batch models.Batch,
) (int64, error) { ) (int64, error) {
retries := 0
for { for {
rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err == nil { if err == nil {
@@ -25,7 +26,8 @@ func (gl *GenericLoader) ProcessBatchWithRetries(
} }
if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok { if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok {
batch.RetryCounter++ retries++
batch.RetryCounter = retries
if batch.RetryCounter >= retryConfig.Attempts { if batch.RetryCounter >= retryConfig.Attempts {
return rowsLoaded, &custom_errors.JobError{ return rowsLoaded, &custom_errors.JobError{