From 6345a0d694bd1d92b87b5081bc8fb03e9464d5e4 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Thu, 9 Apr 2026 21:55:19 -0500 Subject: [PATCH] feat: enhance migration job processing with detailed metrics and error handling --- cmd/go_migrate/extractor.go | 10 ++++++++- cmd/go_migrate/loader.go | 6 +++++- cmd/go_migrate/main.go | 41 +++++++++++++++++++++++++++++++------ cmd/go_migrate/metrics.go | 13 ++++++++++++ cmd/go_migrate/process.go | 37 +++++++++++++++++++++------------ 5 files changed, 86 insertions(+), 21 deletions(-) create mode 100644 cmd/go_migrate/metrics.go diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go index 68421eb..cb2464d 100644 --- a/cmd/go_migrate/extractor.go +++ b/cmd/go_migrate/extractor.go @@ -7,6 +7,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" @@ -36,6 +37,7 @@ func extractFromMssql( chErrorsOut chan<- ExtractorError, chJobErrorsOut chan<- JobError, wgActiveBatches *sync.WaitGroup, + rowsRead *int64, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool { return strings.EqualFold(col.name, tableInfo.PrimaryKey) @@ -69,7 +71,7 @@ func extractFromMssql( return } - if abort := processBatch(ctx, db, tableInfo, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort { + if abort := processBatch(ctx, db, tableInfo, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches, rowsRead); abort { return } } @@ -87,6 +89,7 @@ func processBatch( chChunksOut chan<- Chunk, chErrorsOut chan<- ExtractorError, wgActiveBatches *sync.WaitGroup, + rowsRead *int64, ) (abort bool) { query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) log.Debug("Query used to extract data from mssql: ", query) @@ -147,6 +150,8 @@ func processBatch( return true } + atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + return false } @@ -164,6 +169,7 @@ func processBatch( return true } + atomic.AddInt64(rowsRead, int64(len(rowsChunk))) rowsChunk = make([]UnknownRowValues, 0, chunkSize) chunkStartTime = time.Now() } @@ -201,6 +207,8 @@ func processBatch( case <-ctx.Done(): return true } + + atomic.AddInt64(rowsRead, int64(len(rowsChunk))) } wgActiveBatches.Done() diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index 6024dc0..42e56af 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" @@ -25,6 +26,7 @@ func loadRowsPostgres( chErrorsOut chan<- LoaderError, chJobErrorsOut chan<- JobError, wgActiveChunks *sync.WaitGroup, + rowsLoaded *int64, ) { tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} colNames := Map(columns, func(col ColumnType) string { @@ -44,7 +46,7 @@ func loadRowsPostgres( return } - if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks); abort { + if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks, rowsLoaded); abort { return } } @@ -60,6 +62,7 @@ func loadChunkPostgres( chErrorsOut chan<- LoaderError, chJobErrorsOut chan<- JobError, wgActiveChunks *sync.WaitGroup, + rowsLoaded *int64, ) (abort bool) { chunkStartTime := time.Now() _, err := db.CopyFrom( @@ -99,6 +102,7 @@ func loadChunkPostgres( log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec) + atomic.AddInt64(rowsLoaded, int64(len(chunk.Data))) wgActiveChunks.Done() return false } diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 3ec7f6e..337e0b3 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -36,7 +36,25 @@ func main() { defer sourceDb.Close() defer targetDb.Close() - processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) + results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) + + log.Info("=== RESUMEN DE MIGRACIÓN ===") + var totalProcessed, totalErrors int64 + + for _, res := range results { + status := "OK" + if res.Error != nil { + status = "FAILED" + } + log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration) + + totalProcessed += res.RowsLoaded + if res.Error != nil { + totalErrors++ + } + } + + log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed) totalDuration := time.Since(startTime) log.Infof("=== Migration completed successfully! ===") @@ -49,10 +67,10 @@ func processMigrationJobs( targetDb *pgxpool.Pool, jobs []config.Job, maxParallelWorkers int, -) { +) []JobResult { if len(jobs) == 0 { log.Info("No migration jobs configured") - return + return []JobResult{} } if maxParallelWorkers <= 0 { @@ -65,6 +83,7 @@ func processMigrationJobs( log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) + chJobResults := make(chan JobResult, len(jobs)) chJobs := make(chan config.Job, len(jobs)) var wgJobs sync.WaitGroup @@ -72,7 +91,8 @@ func processMigrationJobs( wgJobs.Go(func() { for job := range chJobs { log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) - processMigrationJob(ctx, sourceDb, targetDb, job) + res := processMigrationJob(ctx, sourceDb, targetDb, job) + chJobResults <- res } }) } @@ -80,8 +100,17 @@ func processMigrationJobs( for _, job := range jobs { chJobs <- job } - close(chJobs) - wgJobs.Wait() + go func() { + wgJobs.Wait() + close(chJobResults) + }() + + var finalResults []JobResult + for res := range chJobResults { + finalResults = append(finalResults, res) + } + + return finalResults } diff --git a/cmd/go_migrate/metrics.go b/cmd/go_migrate/metrics.go new file mode 100644 index 0000000..b540c8c --- /dev/null +++ b/cmd/go_migrate/metrics.go @@ -0,0 +1,13 @@ +package main + +import "time" + +type JobResult struct { + JobName string + StartTime time.Time + Duration time.Duration + RowsRead int64 + RowsLoaded int64 + RowsFailed int64 + Error error +} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index a9d107a..e274e4d 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "sync" + "sync/atomic" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" @@ -18,13 +19,18 @@ func processMigrationJob( sourceDb *sql.DB, targetDb *pgxpool.Pool, job config.Job, -) { - jobStartTime := time.Now() - log.Infof("Starting migration job: %s.%s [PK: %s]", job.SourceTable.Schema, job.SourceTable.Table, job.SourceTable.PrimaryKey) +) JobResult { + result := JobResult{ + JobName: job.Name, + StartTime: time.Now(), + } + + var rowsRead, rowsLoaded, rowsFailed int64 sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable) if err != nil { - log.Fatal("Unexpected error: ", err) + result.Error = err + return result } logColumnTypes(sourceColTypes, "Source col types") @@ -54,6 +60,7 @@ func processMigrationJob( go func() { if err := jobErrorHandler(jobCtx, chJobErrors); err != nil { cancel() + result.Error = err } }() @@ -62,11 +69,10 @@ func processMigrationJob( maxExtractors := min(job.MaxExtractors, len(batches)) log.Infof("Starting %d extractor(s)...", maxExtractors) - extractStartTime := time.Now() for range maxExtractors { wgExtractors.Go(func() { - extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, job.ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) + extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, job.ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches, &rowsRead) }) } @@ -78,7 +84,6 @@ func processMigrationJob( }() log.Infof("Starting %d transformer(s)...", maxExtractors) - transformStartTime := time.Now() for range maxExtractors { wgTransformers.Go(func() { @@ -87,11 +92,10 @@ func processMigrationJob( } log.Infof("Starting %d loader(s)...", job.MaxLoaders) - loadStartTime := time.Now() for range job.MaxLoaders { wgLoaders.Go(func() { - loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks) + loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks, &rowsLoaded) }) } @@ -101,24 +105,31 @@ func processMigrationJob( close(chExtractorErrors) wgExtractors.Wait() - log.Infof("Extraction completed in %v", time.Since(extractStartTime)) close(chChunksRaw) wgTransformers.Wait() - log.Infof("Transformation completed in %v", time.Since(transformStartTime)) wgActiveChunks.Wait() close(chChunksTransformed) close(chLoadersErrors) wgLoaders.Wait() - log.Infof("Loading completed in %v", time.Since(loadStartTime)) cancel() }() <-jobCtx.Done() - log.Infof("Migration job completed (%s.%s). Total time: %v", job.SourceTable.Schema, job.SourceTable.Table, time.Since(jobStartTime)) + + if ctx.Err() != nil { + result.Error = ctx.Err() + } + + result.Duration = time.Since(result.StartTime) + result.RowsRead = atomic.LoadInt64(&rowsRead) + result.RowsLoaded = atomic.LoadInt64(&rowsLoaded) + result.RowsFailed = atomic.LoadInt64(&rowsFailed) + + return result } func logColumnTypes(columnTypes []ColumnType, label string) {