refactor: adjust configuration parameters for extractors and loaders; enhance logging messages for clarity

This commit is contained in:
2026-05-09 01:48:36 -05:00
parent 5a8bce7701
commit a8be31c18b
4 changed files with 8 additions and 7 deletions

View File

@@ -70,14 +70,15 @@ jobs:
- source: DATA - source: DATA
target: FILE_URL target: FILE_URL
mode: REFERENCE_ONLY mode: REFERENCE_ONLY
batches_per_partition: 10000 batches_per_partition: 1000
max_extractors: 8 max_extractors: 8
extractor_queue_size: 32
extractor_batch_size: 1 extractor_batch_size: 1
extractor_queue_size: 64
max_transformers: 16 max_transformers: 16
transformer_batch_size: 20000 transformer_batch_size: 500
transformer_queue_size: 8 transformer_queue_size: 8
max_loaders: 4 max_loaders: 4
loader_batch_size: 500
retry: retry:
attempts: 5 attempts: 5
base_delay_ms: 1000 base_delay_ms: 1000

View File

@@ -73,7 +73,7 @@ func (ex *GenericExtractor) Consume(
if rowsReadResult > 0 { if rowsReadResult > 0 {
current := atomic.LoadInt64(rowsRead) current := atomic.LoadInt64(rowsRead)
logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table) logrus.Debugf("Rows read (partition extracted): +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsRead, int64(rowsReadResult)) atomic.AddInt64(rowsRead, int64(rowsReadResult))
} }

View File

@@ -84,7 +84,7 @@ func (gl *GenericLoader) Consume(
} }
current := atomic.LoadInt64(rowsLoaded) current := atomic.LoadInt64(rowsLoaded)
logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table) logrus.Debugf("Rows loaded (batch loaded): +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsLoaded, int64(processedRows)) atomic.AddInt64(rowsLoaded, int64(processedRows))
return true return true
} }

View File

@@ -103,7 +103,7 @@ func computeStorageTransformationPlan(
schema, table, sourceColName, v) schema, table, sourceColName, v)
return v, nil return v, nil
} }
start := time.Now() // start := time.Now()
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String()) blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b) blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
if err != nil { if err != nil {
@@ -113,7 +113,7 @@ func computeStorageTransformationPlan(
} }
} }
logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds()) // logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
return blobURL, nil return blobURL, nil
}, },
}) })