refactor: update migration job configuration parameters; rename queue and batch size fields for clarity

This commit is contained in:
2026-05-05 23:04:27 -05:00
parent 6414943cf3
commit 7cb959a103
3 changed files with 34 additions and 26 deletions

View File

@@ -110,11 +110,11 @@ func processMigrationJob(
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
chJobErrors := make(chan custom_errors.JobError, job.QueueSize) chJobErrors := make(chan custom_errors.JobError, job.ExtractorQueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
chPartitions := make(chan models.Partition, job.QueueSize) chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
chBatchesRaw := make(chan models.Batch, job.QueueSize) chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
chBatchesTransformed := make(chan models.Batch, job.QueueSize) chBatchesTransformed := make(chan models.Batch, job.ExtractorQueueSize)
var wgActivePartitions sync.WaitGroup var wgActivePartitions sync.WaitGroup
var wgActiveBatches sync.WaitGroup var wgActiveBatches sync.WaitGroup
@@ -133,7 +133,7 @@ func processMigrationJob(
go custom_errors.LoaderErrorHandler( go custom_errors.LoaderErrorHandler(
localCtx, localCtx,
job.Retry, job.Retry,
job.MaxChunkErrors, job.MaxExtractorBatchErrors,
chLoadersErrors, chLoadersErrors,
chBatchesTransformed, chBatchesTransformed,
chJobErrors, chJobErrors,
@@ -149,7 +149,7 @@ func processMigrationJob(
localCtx, localCtx,
job.SourceTable, job.SourceTable,
sourceColTypes, sourceColTypes,
job.BatchSize, job.ExtractorBatchSize,
chPartitions, chPartitions,
chBatchesRaw, chBatchesRaw,
chJobErrors, chJobErrors,

View File

@@ -3,15 +3,19 @@ source_db_type: sqlserver
target_db_type: postgres target_db_type: postgres
defaults: defaults:
max_extractors: 2
max_loaders: 4
queue_size: 8
batch_size: 25000
batches_per_partition: 8 batches_per_partition: 8
max_extractors: 2
extractor_batch_size: 25000
extractor_queue_size: 8
max_transformers: 2
transformer_batch_size: 25000
transformer_queue_size: 8
max_loaders: 4
loader_batch_size: 25000
truncate_target: true truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5 max_partition_errrors: 5
max_chunk_errors: 5 max_extractor_batch_errors: 5
retry: retry:
attempts: 3 attempts: 3
base_delay_ms: 500 base_delay_ms: 500

View File

@@ -25,18 +25,22 @@ type ToStorageConfig struct {
} }
type JobConfig struct { type JobConfig struct {
MaxExtractors int `yaml:"max_extractors"` BatchesPerPartition int `yaml:"batches_per_partition"`
MaxLoaders int `yaml:"max_loaders"` MaxExtractors int `yaml:"max_extractors"`
QueueSize int `yaml:"queue_size"` ExtractorBatchSize int `yaml:"extractor_batch_size"`
BatchSize int `yaml:"batch_size"` ExtractorQueueSize int `yaml:"extractor_queue_size"`
BatchesPerPartition int `yaml:"batches_per_partition"` MaxTransformers int `yaml:"max_transformers"`
TruncateTarget bool `yaml:"truncate_target"` TransformerBatchSize int `yaml:"transformer_batch_size"`
TruncateMethod string `yaml:"truncate_method"` TransformerQueueSize int `yaml:"transformer_queue_size"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"` MaxLoaders int `yaml:"max_loaders"`
MaxChunkErrors int `yaml:"max_chunk_errors"` LoaderBatchSize int `yaml:"loader_batch_size"`
Retry RetryConfig `yaml:"retry"` TruncateTarget bool `yaml:"truncate_target"`
RowsPerPartition int64 TruncateMethod string `yaml:"truncate_method"`
ToStorage ToStorageConfig `yaml:"to_storage"` MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
} }
type TableInfo struct { type TableInfo struct {
@@ -97,7 +101,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.Defaults = raw.Defaults c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition) c.Defaults.RowsPerPartition = int64(raw.Defaults.ExtractorBatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs { for _, node := range raw.Jobs {
job := Job{ job := Job{
@@ -108,7 +112,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
return err return err
} }
job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition) job.RowsPerPartition = int64(job.ExtractorBatchSize * job.BatchesPerPartition)
c.Jobs = append(c.Jobs, job) c.Jobs = append(c.Jobs, job)
} }