feat: enhance migration job processing with parallel execution and improved logging
This commit is contained in:
@@ -2,21 +2,15 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
NumExtractors int = 4
|
|
||||||
NumLoaders int = 8
|
|
||||||
ChunkSize int = 25000
|
|
||||||
QueueSize int = 8
|
|
||||||
ChunksPerBatch int = 16
|
|
||||||
RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch)
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
configureLog()
|
configureLog()
|
||||||
|
|
||||||
@@ -33,7 +27,6 @@ func main() {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
log.Info("=== Starting migration ===")
|
log.Info("=== Starting migration ===")
|
||||||
log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize)
|
|
||||||
|
|
||||||
sourceDb, targetDb, connError := connectToDatabases()
|
sourceDb, targetDb, connError := connectToDatabases()
|
||||||
if connError != nil {
|
if connError != nil {
|
||||||
@@ -43,12 +36,52 @@ func main() {
|
|||||||
defer sourceDb.Close()
|
defer sourceDb.Close()
|
||||||
defer targetDb.Close()
|
defer targetDb.Close()
|
||||||
|
|
||||||
for _, job := range migrationConfig.Jobs {
|
processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
|
||||||
log.Infof(">>> Processing job: %s.%s <<<", job.SourceTable.Schema, job.SourceTable.Table)
|
|
||||||
processMigrationJob(ctx, sourceDb, targetDb, job)
|
|
||||||
}
|
|
||||||
|
|
||||||
totalDuration := time.Since(startTime)
|
totalDuration := time.Since(startTime)
|
||||||
log.Infof("=== Migration completed successfully! ===")
|
log.Infof("=== Migration completed successfully! ===")
|
||||||
log.Infof("Total migration time: %v", totalDuration)
|
log.Infof("Total migration time: %v", totalDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func processMigrationJobs(
|
||||||
|
ctx context.Context,
|
||||||
|
sourceDb *sql.DB,
|
||||||
|
targetDb *pgxpool.Pool,
|
||||||
|
jobs []config.Job,
|
||||||
|
maxParallelWorkers int,
|
||||||
|
) {
|
||||||
|
if len(jobs) == 0 {
|
||||||
|
log.Info("No migration jobs configured")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxParallelWorkers <= 0 {
|
||||||
|
maxParallelWorkers = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxParallelWorkers > len(jobs) {
|
||||||
|
maxParallelWorkers = len(jobs)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
|
||||||
|
|
||||||
|
chJobs := make(chan config.Job, len(jobs))
|
||||||
|
var wgJobs sync.WaitGroup
|
||||||
|
|
||||||
|
for i := range maxParallelWorkers {
|
||||||
|
wgJobs.Go(func() {
|
||||||
|
for job := range chJobs {
|
||||||
|
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
|
||||||
|
processMigrationJob(ctx, sourceDb, targetDb, job)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
chJobs <- job
|
||||||
|
}
|
||||||
|
|
||||||
|
close(chJobs)
|
||||||
|
|
||||||
|
wgJobs.Wait()
|
||||||
|
}
|
||||||
|
|||||||
@@ -118,7 +118,7 @@ func processMigrationJob(
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
<-jobCtx.Done()
|
<-jobCtx.Done()
|
||||||
log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime))
|
log.Infof("Migration job completed (%s.%s). Total time: %v", job.SourceTable.Schema, job.SourceTable.Table, time.Since(jobStartTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
func logColumnTypes(columnTypes []ColumnType, label string) {
|
func logColumnTypes(columnTypes []ColumnType, label string) {
|
||||||
|
|||||||
23
config.yaml
23
config.yaml
@@ -1,11 +1,11 @@
|
|||||||
max_parallel_workers: 2
|
max_parallel_workers: 4
|
||||||
|
|
||||||
defaults:
|
defaults:
|
||||||
max_extractors: 4
|
max_extractors: 2
|
||||||
max_loaders: 8
|
max_loaders: 4
|
||||||
queue_size: 8
|
queue_size: 8
|
||||||
chunk_size: 50000
|
chunk_size: 25000
|
||||||
chunks_per_batch: 10
|
chunks_per_batch: 8
|
||||||
truncate_target: true
|
truncate_target: true
|
||||||
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
||||||
retry:
|
retry:
|
||||||
@@ -21,19 +21,6 @@ jobs:
|
|||||||
target:
|
target:
|
||||||
schema: Cartografia
|
schema: Cartografia
|
||||||
table: MANZANA
|
table: MANZANA
|
||||||
max_extractors: 2 # overrides default config
|
|
||||||
max_loaders: 4 # overrides default config
|
|
||||||
queue_size: 4 # overrides default config
|
|
||||||
chunk_size: 25000 # overrides default config
|
|
||||||
chunks_per_batch: 8 # overrides default config
|
|
||||||
truncate_target: false # overrides default config
|
|
||||||
truncate_method: DELETE # overrides default config
|
|
||||||
retry:
|
|
||||||
attempts: 5 # overrides default config
|
|
||||||
pre_sql:
|
|
||||||
- "SELECT 1"
|
|
||||||
post_sql:
|
|
||||||
- "SELECT 2"
|
|
||||||
|
|
||||||
- name: red_puerto
|
- name: red_puerto
|
||||||
enabled: true
|
enabled: true
|
||||||
|
|||||||
Reference in New Issue
Block a user