From f12937a1c3272da0ef0f0ad4d03e0ed36bf8ed50 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Tue, 5 May 2026 23:20:34 -0500 Subject: [PATCH] refactor: standardize job error channel size; update batch size for transformed batches --- cmd/go_migrate/process.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 512e040..17a7440 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -20,6 +20,8 @@ import ( "golang.org/x/sync/errgroup" ) +const jobErrorsChannelSize int = 100 + func buildTruncateQuery(targetDbType, schema, table, truncateMethod string) string { if truncateMethod == "DELETE" { if targetDbType == "postgres" { @@ -110,11 +112,11 @@ func processMigrationJob( log.Error("Unexpected error calculating batch ranges: ", err) } - chJobErrors := make(chan custom_errors.JobError, job.ExtractorQueueSize) + chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize) chPartitions := make(chan models.Partition, job.ExtractorQueueSize) chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize) - chBatchesTransformed := make(chan models.Batch, job.ExtractorQueueSize) + chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize) var wgActivePartitions sync.WaitGroup var wgActiveBatches sync.WaitGroup