refactor: standardize job error channel size; update batch size for transformed batches

This commit is contained in:
2026-05-05 23:20:34 -05:00
parent 2a5f703f3c
commit f12937a1c3

View File

@@ -20,6 +20,8 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
const jobErrorsChannelSize int = 100
func buildTruncateQuery(targetDbType, schema, table, truncateMethod string) string { func buildTruncateQuery(targetDbType, schema, table, truncateMethod string) string {
if truncateMethod == "DELETE" { if truncateMethod == "DELETE" {
if targetDbType == "postgres" { if targetDbType == "postgres" {
@@ -110,11 +112,11 @@ func processMigrationJob(
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
chJobErrors := make(chan custom_errors.JobError, job.ExtractorQueueSize) chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
chPartitions := make(chan models.Partition, job.ExtractorQueueSize) chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize) chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
chBatchesTransformed := make(chan models.Batch, job.ExtractorQueueSize) chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
var wgActivePartitions sync.WaitGroup var wgActivePartitions sync.WaitGroup
var wgActiveBatches sync.WaitGroup var wgActiveBatches sync.WaitGroup