diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index b02ca1a..3ec7f6e 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -2,21 +2,15 @@ package main import ( "context" + "database/sql" + "sync" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" ) -const ( - NumExtractors int = 4 - NumLoaders int = 8 - ChunkSize int = 25000 - QueueSize int = 8 - ChunksPerBatch int = 16 - RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch) -) - func main() { configureLog() @@ -33,7 +27,6 @@ func main() { defer cancel() log.Info("=== Starting migration ===") - log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize) sourceDb, targetDb, connError := connectToDatabases() if connError != nil { @@ -43,12 +36,52 @@ func main() { defer sourceDb.Close() defer targetDb.Close() - for _, job := range migrationConfig.Jobs { - log.Infof(">>> Processing job: %s.%s <<<", job.SourceTable.Schema, job.SourceTable.Table) - processMigrationJob(ctx, sourceDb, targetDb, job) - } + processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) totalDuration := time.Since(startTime) log.Infof("=== Migration completed successfully! ===") log.Infof("Total migration time: %v", totalDuration) } + +func processMigrationJobs( + ctx context.Context, + sourceDb *sql.DB, + targetDb *pgxpool.Pool, + jobs []config.Job, + maxParallelWorkers int, +) { + if len(jobs) == 0 { + log.Info("No migration jobs configured") + return + } + + if maxParallelWorkers <= 0 { + maxParallelWorkers = 1 + } + + if maxParallelWorkers > len(jobs) { + maxParallelWorkers = len(jobs) + } + + log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) + + chJobs := make(chan config.Job, len(jobs)) + var wgJobs sync.WaitGroup + + for i := range maxParallelWorkers { + wgJobs.Go(func() { + for job := range chJobs { + log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) + processMigrationJob(ctx, sourceDb, targetDb, job) + } + }) + } + + for _, job := range jobs { + chJobs <- job + } + + close(chJobs) + + wgJobs.Wait() +} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 5175513..a9d107a 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -118,7 +118,7 @@ func processMigrationJob( }() <-jobCtx.Done() - log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime)) + log.Infof("Migration job completed (%s.%s). Total time: %v", job.SourceTable.Schema, job.SourceTable.Table, time.Since(jobStartTime)) } func logColumnTypes(columnTypes []ColumnType, label string) { diff --git a/config.yaml b/config.yaml index 31a74d3..8a47470 100644 --- a/config.yaml +++ b/config.yaml @@ -1,11 +1,11 @@ -max_parallel_workers: 2 +max_parallel_workers: 4 defaults: - max_extractors: 4 - max_loaders: 8 + max_extractors: 2 + max_loaders: 4 queue_size: 8 - chunk_size: 50000 - chunks_per_batch: 10 + chunk_size: 25000 + chunks_per_batch: 8 truncate_target: true truncate_method: TRUNCATE # TRUNCATE | DELETE retry: @@ -21,19 +21,6 @@ jobs: target: schema: Cartografia table: MANZANA - max_extractors: 2 # overrides default config - max_loaders: 4 # overrides default config - queue_size: 4 # overrides default config - chunk_size: 25000 # overrides default config - chunks_per_batch: 8 # overrides default config - truncate_target: false # overrides default config - truncate_method: DELETE # overrides default config - retry: - attempts: 5 # overrides default config - pre_sql: - - "SELECT 1" - post_sql: - - "SELECT 2" - name: red_puerto enabled: true