diff --git a/cmd/go_migrate/extractor-error-handler.go b/cmd/go_migrate/extractor-error-handler.go index a223631..7f8e7fd 100644 --- a/cmd/go_migrate/extractor-error-handler.go +++ b/cmd/go_migrate/extractor-error-handler.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "sync" "github.com/google/uuid" ) @@ -25,6 +26,7 @@ func extractorErrorHandler( chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chJobErrorsOut chan<- JobError, + wgActiveBatches *sync.WaitGroup, ) { for { if ctx.Err() != nil { @@ -52,6 +54,8 @@ func extractorErrorHandler( case <-ctx.Done(): return } + + wgActiveBatches.Done() continue } diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go index bdeb3ab..3756d48 100644 --- a/cmd/go_migrate/extractor.go +++ b/cmd/go_migrate/extractor.go @@ -6,6 +6,7 @@ import ( "errors" "slices" "strings" + "sync" "time" "github.com/google/uuid" @@ -33,6 +34,7 @@ func extractFromMssql( chChunksOut chan<- Chunk, chErrorsOut chan<- ExtractorError, chJobErrorsOut chan<- JobError, + wgActiveBatches *sync.WaitGroup, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool { return strings.EqualFold(col.name, job.PrimaryKey) @@ -66,7 +68,7 @@ func extractFromMssql( return } - if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut); abort { + if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort { return } } @@ -83,6 +85,7 @@ func processBatch( indexPrimaryKey int, chChunksOut chan<- Chunk, chErrorsOut chan<- ExtractorError, + wgActiveBatches *sync.WaitGroup, ) (abort bool) { query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) log.Debug("Query used to extract data from mssql: ", query) @@ -199,6 +202,7 @@ func processBatch( } } + wgActiveBatches.Done() return false } diff --git a/cmd/go_migrate/job-error-handler.go b/cmd/go_migrate/job-error-handler.go index 5930bc9..aa7652b 100644 --- a/cmd/go_migrate/job-error-handler.go +++ b/cmd/go_migrate/job-error-handler.go @@ -40,7 +40,7 @@ func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error { return &err } - log.Error(err) + log.Error(err.Msg, " - ", err.Prev) } } } diff --git a/cmd/go_migrate/loader-error-handler.go b/cmd/go_migrate/loader-error-handler.go index c1ac8c1..205fa73 100644 --- a/cmd/go_migrate/loader-error-handler.go +++ b/cmd/go_migrate/loader-error-handler.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "sync" ) type LoaderError struct { @@ -19,6 +20,7 @@ func loaderErrorHandler( chErrorsIn <-chan LoaderError, chChunksOut chan<- Chunk, chJobErrorsOut chan<- JobError, + wgActiveChunks *sync.WaitGroup, ) { for { if ctx.Err() != nil { @@ -46,6 +48,8 @@ func loaderErrorHandler( case <-ctx.Done(): return } + + wgActiveChunks.Done() continue } diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index a677bc1..344b0d3 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "sync" "time" "github.com/jackc/pgx/v5" @@ -19,6 +20,7 @@ func loadRowsPostgres( columns []ColumnType, chChunksIn <-chan Chunk, chErrorsOut chan<- LoaderError, + wgActiveChunks *sync.WaitGroup, ) { tableId := pgx.Identifier{job.Schema, job.Table} colNames := Map(columns, func(col ColumnType) string { @@ -38,7 +40,7 @@ func loadRowsPostgres( return } - if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut); abort { + if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, wgActiveChunks); abort { return } } @@ -52,6 +54,7 @@ func loadChunkPostgres( colNames []string, chunk Chunk, chErrorsOut chan<- LoaderError, + wgActiveChunks *sync.WaitGroup, ) (abort bool) { chunkStartTime := time.Now() _, err := db.CopyFrom( @@ -75,6 +78,7 @@ func loadChunkPostgres( log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec) + wgActiveChunks.Done() return false } diff --git a/cmd/go_migrate/log.go b/cmd/go_migrate/log.go index a7afee3..a7bad26 100644 --- a/cmd/go_migrate/log.go +++ b/cmd/go_migrate/log.go @@ -13,5 +13,5 @@ func configureLog() { DisableSorting: false, PadLevelText: true, }) - log.SetLevel(log.DebugLevel) + log.SetLevel(log.InfoLevel) } diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index cc6b457..949a8f5 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -32,89 +32,82 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration log.Error("Unexpected error calculating batch ranges: ", err) } - chJobErrors := make(chan JobError) - defer close(chJobErrors) - - go func() { - if err := jobErrorHandler(ctx, chJobErrors); err != nil { - if ctx.Err() == nil { - cancel() - } - } - }() - - chBatches := make(chan Batch, len(batches)) - chExtractorErrors := make(chan ExtractorError, len(batches)) - - go func() { - extractorErrorHandler(ctx, chExtractorErrors, chBatches, chJobErrors) - }() - + chJobErrors := make(chan JobError, 100) + chBatches := make(chan Batch, QueueSize) + chExtractorErrors := make(chan ExtractorError, QueueSize) chChunksRaw := make(chan Chunk, QueueSize) - maxExtractors := min(NumExtractors, len(batches)) - var wgMssqlExtractors sync.WaitGroup - - log.Infof("Starting %d MSSQL extractors...", maxExtractors) - extractStartTime := time.Now() - for range maxExtractors { - wgMssqlExtractors.Go(func() { - extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors) - }) - } - - go func() { - for _, br := range batches { - chBatches <- br - } - close(chBatches) - close(chExtractorErrors) - }() - - go func() { - wgMssqlExtractors.Wait() - close(chChunksRaw) - log.Infof("Extraction completed in %v", time.Since(extractStartTime)) - }() - chChunksTransformed := make(chan Chunk, QueueSize) - var wgMssqlTransformers sync.WaitGroup + chLoadersErrors := make(chan LoaderError, QueueSize) + + var wgActiveBatches sync.WaitGroup + var wgActiveChunks sync.WaitGroup + var wgExtractors sync.WaitGroup + var wgTransformers sync.WaitGroup + var wgLoaders sync.WaitGroup + + go jobErrorHandler(ctx, chJobErrors) + go extractorErrorHandler(ctx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) + go loaderErrorHandler(ctx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) + + maxExtractors := min(NumExtractors, len(batches)) + log.Infof("Starting %d extractors...", maxExtractors) + extractStartTime := time.Now() - log.Infof("Starting %d MSSQL transformers...", maxExtractors) - transformStartTime := time.Now() for range maxExtractors { - wgMssqlTransformers.Go(func() { - transformRowsMssql(ctx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors) + wgExtractors.Go(func() { + extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) }) } + wgActiveBatches.Add(len(batches)) go func() { - wgMssqlTransformers.Wait() - close(chChunksTransformed) - log.Infof("Transformation completed in %v", time.Since(transformStartTime)) + for _, batch := range batches { + chBatches <- batch + } }() - var wgPostgresLoaders sync.WaitGroup - chLoadersErrors := make(chan LoaderError) + log.Infof("Starting %d transformers...", maxExtractors) + transformStartTime := time.Now() - go func() { - loaderErrorHandler(ctx, chLoadersErrors, chChunksTransformed, chJobErrors) - }() + for range maxExtractors { + wgTransformers.Go(func() { + transformRowsMssql(ctx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks) + }) + } log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) - loaderStartTime := time.Now() + loadStartTime := time.Now() for range NumLoaders { - wgPostgresLoaders.Go(func() { - loadRowsPostgres(ctx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors) + wgLoaders.Go(func() { + loadRowsPostgres(ctx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, &wgActiveChunks) }) } - wgPostgresLoaders.Wait() - close(chLoadersErrors) - log.Infof("Loading completed in %v", time.Since(loaderStartTime)) + go func() { + wgActiveBatches.Wait() + close(chBatches) + close(chExtractorErrors) - totalDuration := time.Since(jobStartTime) - log.Infof("Migration job completed successfully! Total time: %v", totalDuration) + wgExtractors.Wait() + log.Infof("Extraction completed in %v", time.Since(extractStartTime)) + close(chChunksRaw) + + wgTransformers.Wait() + log.Infof("Transformation completed in %v", time.Since(transformStartTime)) + + wgActiveChunks.Wait() + close(chChunksTransformed) + close(chLoadersErrors) + + wgLoaders.Wait() + log.Infof("Loading completed in %v", time.Since(loadStartTime)) + + cancel() + }() + + <-ctx.Done() + log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime)) } func logColumnTypes(columnTypes []ColumnType, label string) { diff --git a/cmd/go_migrate/transformer.go b/cmd/go_migrate/transformer.go index 184e5a6..4107a0c 100644 --- a/cmd/go_migrate/transformer.go +++ b/cmd/go_migrate/transformer.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "sync" "time" log "github.com/sirupsen/logrus" @@ -21,6 +22,7 @@ func transformRowsMssql( chChunksIn <-chan Chunk, chChunksOut chan<- Chunk, chJobErrorsOut chan<- JobError, + wgActiveChunks *sync.WaitGroup, ) { transformationPlan := computeTransformationPlan(columns) @@ -41,6 +43,7 @@ func transformRowsMssql( if len(transformationPlan) == 0 { select { case chChunksOut <- chunk: + wgActiveChunks.Add(1) continue case <-ctx.Done(): return @@ -69,6 +72,8 @@ func transformRowsMssql( case <-ctx.Done(): return } + + wgActiveChunks.Add(1) } } }