diff --git a/config.yaml b/config.yaml index 8d57bfa..f0afca6 100644 --- a/config.yaml +++ b/config.yaml @@ -70,14 +70,15 @@ jobs: - source: DATA target: FILE_URL mode: REFERENCE_ONLY - batches_per_partition: 10000 + batches_per_partition: 1000 max_extractors: 8 - extractor_queue_size: 32 extractor_batch_size: 1 + extractor_queue_size: 64 max_transformers: 16 - transformer_batch_size: 20000 + transformer_batch_size: 500 transformer_queue_size: 8 max_loaders: 4 + loader_batch_size: 500 retry: attempts: 5 base_delay_ms: 1000 diff --git a/internal/app/etl/extractors/consume.go b/internal/app/etl/extractors/consume.go index 3090aab..2fd9984 100644 --- a/internal/app/etl/extractors/consume.go +++ b/internal/app/etl/extractors/consume.go @@ -73,7 +73,7 @@ func (ex *GenericExtractor) Consume( if rowsReadResult > 0 { current := atomic.LoadInt64(rowsRead) - logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table) + logrus.Debugf("Rows read (partition extracted): +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table) atomic.AddInt64(rowsRead, int64(rowsReadResult)) } diff --git a/internal/app/etl/loaders/consume.go b/internal/app/etl/loaders/consume.go index 1851f76..0b93aa9 100644 --- a/internal/app/etl/loaders/consume.go +++ b/internal/app/etl/loaders/consume.go @@ -84,7 +84,7 @@ func (gl *GenericLoader) Consume( } current := atomic.LoadInt64(rowsLoaded) - logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table) + logrus.Debugf("Rows loaded (batch loaded): +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table) atomic.AddInt64(rowsLoaded, int64(processedRows)) return true } diff --git a/internal/app/etl/transformers/plan.go b/internal/app/etl/transformers/plan.go index 435bd46..5758dba 100644 --- a/internal/app/etl/transformers/plan.go +++ b/internal/app/etl/transformers/plan.go @@ -103,7 +103,7 @@ func computeStorageTransformationPlan( schema, table, sourceColName, v) return v, nil } - start := time.Now() + // start := time.Now() blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String()) blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b) if err != nil { @@ -113,7 +113,7 @@ func computeStorageTransformationPlan( } } - logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds()) + // logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds()) return blobURL, nil }, })