diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 1e35c33..ec30597 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -114,7 +114,6 @@ func processMigrationJob( } chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize) - chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize) chPartitions := make(chan models.Partition, job.ExtractorQueueSize) chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize) chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize) @@ -133,16 +132,6 @@ func processMigrationJob( } }() - go custom_errors.LoaderErrorHandler( - localCtx, - job.Retry, - job.MaxExtractorBatchErrors, - chLoadersErrors, - chBatchesTransformed, - chJobErrors, - &wgActiveBatches, - ) - maxExtractors := min(job.MaxExtractors, len(partitions)) log.Infof("Starting %d extractor(s)...", maxExtractors) @@ -223,8 +212,6 @@ func processMigrationJob( log.Debugf("wgActiveBatches is empty (%v)", job.Name) close(chBatchesTransformed) log.Debugf("chBatchesTransformed is empty (%v)", job.Name) - close(chLoadersErrors) - log.Debugf("chLoadersErrors is empty (%v)", job.Name) wgLoaders.Wait() log.Debugf("wgLoaders is empty (%v)", job.Name) diff --git a/config.yaml b/config.yaml index 9c50787..f8887b1 100644 --- a/config.yaml +++ b/config.yaml @@ -12,10 +12,11 @@ defaults: transformer_queue_size: 8 max_loaders: 4 loader_batch_size: 25000 + max_failed_partitions: 5 + max_failed_batches_transform: 5 + max_failed_batches_load: 5 truncate_target: true truncate_method: TRUNCATE # TRUNCATE | DELETE - max_partition_errrors: 5 - max_extractor_batch_errors: 5 retry: attempts: 3 base_delay_ms: 500 @@ -34,11 +35,11 @@ jobs: table: MANZANA pre_sql: - 'SELECT 1' - range: - min: 1000000 - max: 2000000 - is_min_inclusive: false - is_max_inclusive: true + # range: + # min: 1000000 + # max: 2000000 + # is_min_inclusive: false + # is_max_inclusive: true - name: red_puerto enabled: true @@ -57,25 +58,29 @@ jobs: post_sql: - "SELECT 1" - - name: infraestructura_site_holder__attach - source: - schema: Infraestructura - table: SITE_HOLDER__ATTACH - primary_key: GDB_ARCHIVE_OID - target: - schema: Infraestructura - table: SITE_HOLDER__ATTACH - to_storage: - columns: - - source: DATA - target: FILE_URL - mode: REFERENCE_ONLY - max_extractors: 8 - max_loaders: 4 - queue_size: 32 - batch_size: 1 - retry: - attempts: 5 - base_delay_ms: 1000 - max_delay_ms: 15000 - max_jitter_ms: 500 + # - name: infraestructura_site_holder__attach + # source: + # schema: Infraestructura + # table: SITE_HOLDER__ATTACH + # primary_key: GDB_ARCHIVE_OID + # target: + # schema: Infraestructura + # table: SITE_HOLDER__ATTACH + # to_storage: + # columns: + # - source: DATA + # target: FILE_URL + # mode: REFERENCE_ONLY + # batches_per_partition: 10000 + # max_extractors: 8 + # extractor_queue_size: 32 + # extractor_batch_size: 1 + # max_transformers: 16 + # transformer_batch_size: 20000 + # transformer_queue_size: 8 + # max_loaders: 4 + # retry: + # attempts: 5 + # base_delay_ms: 1000 + # max_delay_ms: 15000 + # max_jitter_ms: 500 diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index cb5dd41..dc19935 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -25,22 +25,23 @@ type ToStorageConfig struct { } type JobConfig struct { - BatchesPerPartition int `yaml:"batches_per_partition"` - MaxExtractors int `yaml:"max_extractors"` - ExtractorBatchSize int `yaml:"extractor_batch_size"` - ExtractorQueueSize int `yaml:"extractor_queue_size"` - MaxTransformers int `yaml:"max_transformers"` - TransformerBatchSize int `yaml:"transformer_batch_size"` - TransformerQueueSize int `yaml:"transformer_queue_size"` - MaxLoaders int `yaml:"max_loaders"` - LoaderBatchSize int `yaml:"loader_batch_size"` - TruncateTarget bool `yaml:"truncate_target"` - TruncateMethod string `yaml:"truncate_method"` - MaxPartitionErrrors int `yaml:"max_partition_errrors"` - MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"` - Retry RetryConfig `yaml:"retry"` - RowsPerPartition int64 - ToStorage ToStorageConfig `yaml:"to_storage"` + BatchesPerPartition int `yaml:"batches_per_partition"` + MaxExtractors int `yaml:"max_extractors"` + ExtractorBatchSize int `yaml:"extractor_batch_size"` + ExtractorQueueSize int `yaml:"extractor_queue_size"` + MaxTransformers int `yaml:"max_transformers"` + TransformerBatchSize int `yaml:"transformer_batch_size"` + TransformerQueueSize int `yaml:"transformer_queue_size"` + MaxLoaders int `yaml:"max_loaders"` + LoaderBatchSize int `yaml:"loader_batch_size"` + MaxFailedPartitions int `yaml:"max_failed_partitions"` + MaxFailedBatchesTransform int `yaml:"max_failed_batches_transform"` + MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"` + TruncateTarget bool `yaml:"truncate_target"` + TruncateMethod string `yaml:"truncate_method"` + Retry RetryConfig `yaml:"retry"` + RowsPerPartition int64 + ToStorage ToStorageConfig `yaml:"to_storage"` } type FromJsonItem struct { diff --git a/internal/app/etl/loaders/process-with-retries.go b/internal/app/etl/loaders/process-with-retries.go index 8c1179c..55d8413 100644 --- a/internal/app/etl/loaders/process-with-retries.go +++ b/internal/app/etl/loaders/process-with-retries.go @@ -18,6 +18,7 @@ func (gl *GenericLoader) ProcessBatchWithRetries( retryConfig config.RetryConfig, batch models.Batch, ) (int64, error) { + retries := 0 for { rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) if err == nil { @@ -25,7 +26,8 @@ func (gl *GenericLoader) ProcessBatchWithRetries( } if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok { - batch.RetryCounter++ + retries++ + batch.RetryCounter = retries if batch.RetryCounter >= retryConfig.Attempts { return rowsLoaded, &custom_errors.JobError{