diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 5983f00..b4583c2 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -110,11 +110,11 @@ func processMigrationJob( log.Error("Unexpected error calculating batch ranges: ", err) } - chJobErrors := make(chan custom_errors.JobError, job.QueueSize) - chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) - chPartitions := make(chan models.Partition, job.QueueSize) - chBatchesRaw := make(chan models.Batch, job.QueueSize) - chBatchesTransformed := make(chan models.Batch, job.QueueSize) + chJobErrors := make(chan custom_errors.JobError, job.ExtractorQueueSize) + chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize) + chPartitions := make(chan models.Partition, job.ExtractorQueueSize) + chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize) + chBatchesTransformed := make(chan models.Batch, job.ExtractorQueueSize) var wgActivePartitions sync.WaitGroup var wgActiveBatches sync.WaitGroup @@ -133,7 +133,7 @@ func processMigrationJob( go custom_errors.LoaderErrorHandler( localCtx, job.Retry, - job.MaxChunkErrors, + job.MaxExtractorBatchErrors, chLoadersErrors, chBatchesTransformed, chJobErrors, @@ -149,7 +149,7 @@ func processMigrationJob( localCtx, job.SourceTable, sourceColTypes, - job.BatchSize, + job.ExtractorBatchSize, chPartitions, chBatchesRaw, chJobErrors, diff --git a/config.yaml b/config.yaml index 63ac1c4..a46879c 100644 --- a/config.yaml +++ b/config.yaml @@ -3,15 +3,19 @@ source_db_type: sqlserver target_db_type: postgres defaults: - max_extractors: 2 - max_loaders: 4 - queue_size: 8 - batch_size: 25000 batches_per_partition: 8 + max_extractors: 2 + extractor_batch_size: 25000 + extractor_queue_size: 8 + max_transformers: 2 + transformer_batch_size: 25000 + transformer_queue_size: 8 + max_loaders: 4 + loader_batch_size: 25000 truncate_target: true truncate_method: TRUNCATE # TRUNCATE | DELETE max_partition_errrors: 5 - max_chunk_errors: 5 + max_extractor_batch_errors: 5 retry: attempts: 3 base_delay_ms: 500 diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index bfbae35..51baa7c 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -25,18 +25,22 @@ type ToStorageConfig struct { } type JobConfig struct { - MaxExtractors int `yaml:"max_extractors"` - MaxLoaders int `yaml:"max_loaders"` - QueueSize int `yaml:"queue_size"` - BatchSize int `yaml:"batch_size"` - BatchesPerPartition int `yaml:"batches_per_partition"` - TruncateTarget bool `yaml:"truncate_target"` - TruncateMethod string `yaml:"truncate_method"` - MaxPartitionErrrors int `yaml:"max_partition_errrors"` - MaxChunkErrors int `yaml:"max_chunk_errors"` - Retry RetryConfig `yaml:"retry"` - RowsPerPartition int64 - ToStorage ToStorageConfig `yaml:"to_storage"` + BatchesPerPartition int `yaml:"batches_per_partition"` + MaxExtractors int `yaml:"max_extractors"` + ExtractorBatchSize int `yaml:"extractor_batch_size"` + ExtractorQueueSize int `yaml:"extractor_queue_size"` + MaxTransformers int `yaml:"max_transformers"` + TransformerBatchSize int `yaml:"transformer_batch_size"` + TransformerQueueSize int `yaml:"transformer_queue_size"` + MaxLoaders int `yaml:"max_loaders"` + LoaderBatchSize int `yaml:"loader_batch_size"` + TruncateTarget bool `yaml:"truncate_target"` + TruncateMethod string `yaml:"truncate_method"` + MaxPartitionErrrors int `yaml:"max_partition_errrors"` + MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"` + Retry RetryConfig `yaml:"retry"` + RowsPerPartition int64 + ToStorage ToStorageConfig `yaml:"to_storage"` } type TableInfo struct { @@ -97,7 +101,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error { c.Defaults = raw.Defaults c.SourceDbType = raw.SourceDbType c.TargetDbType = raw.TargetDbType - c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition) + c.Defaults.RowsPerPartition = int64(raw.Defaults.ExtractorBatchSize * raw.Defaults.BatchesPerPartition) for _, node := range raw.Jobs { job := Job{ @@ -108,7 +112,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error { return err } - job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition) + job.RowsPerPartition = int64(job.ExtractorBatchSize * job.BatchesPerPartition) c.Jobs = append(c.Jobs, job) }