feat: enhance migration job processing with detailed metrics and error handling

This commit is contained in:
2026-04-09 21:55:19 -05:00
parent 1db35c796c
commit 6345a0d694
5 changed files with 86 additions and 21 deletions

View File

@@ -7,6 +7,7 @@ import (
"slices" "slices"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
@@ -36,6 +37,7 @@ func extractFromMssql(
chErrorsOut chan<- ExtractorError, chErrorsOut chan<- ExtractorError,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
rowsRead *int64,
) { ) {
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool { indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
return strings.EqualFold(col.name, tableInfo.PrimaryKey) return strings.EqualFold(col.name, tableInfo.PrimaryKey)
@@ -69,7 +71,7 @@ func extractFromMssql(
return return
} }
if abort := processBatch(ctx, db, tableInfo, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort { if abort := processBatch(ctx, db, tableInfo, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches, rowsRead); abort {
return return
} }
} }
@@ -87,6 +89,7 @@ func processBatch(
chChunksOut chan<- Chunk, chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError, chErrorsOut chan<- ExtractorError,
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
rowsRead *int64,
) (abort bool) { ) (abort bool) {
query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive)
log.Debug("Query used to extract data from mssql: ", query) log.Debug("Query used to extract data from mssql: ", query)
@@ -147,6 +150,8 @@ func processBatch(
return true return true
} }
atomic.AddInt64(rowsRead, int64(len(rowsChunk)))
return false return false
} }
@@ -164,6 +169,7 @@ func processBatch(
return true return true
} }
atomic.AddInt64(rowsRead, int64(len(rowsChunk)))
rowsChunk = make([]UnknownRowValues, 0, chunkSize) rowsChunk = make([]UnknownRowValues, 0, chunkSize)
chunkStartTime = time.Now() chunkStartTime = time.Now()
} }
@@ -201,6 +207,8 @@ func processBatch(
case <-ctx.Done(): case <-ctx.Done():
return true return true
} }
atomic.AddInt64(rowsRead, int64(len(rowsChunk)))
} }
wgActiveBatches.Done() wgActiveBatches.Done()

View File

@@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
@@ -25,6 +26,7 @@ func loadRowsPostgres(
chErrorsOut chan<- LoaderError, chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup, wgActiveChunks *sync.WaitGroup,
rowsLoaded *int64,
) { ) {
tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table}
colNames := Map(columns, func(col ColumnType) string { colNames := Map(columns, func(col ColumnType) string {
@@ -44,7 +46,7 @@ func loadRowsPostgres(
return return
} }
if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks); abort { if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks, rowsLoaded); abort {
return return
} }
} }
@@ -60,6 +62,7 @@ func loadChunkPostgres(
chErrorsOut chan<- LoaderError, chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup, wgActiveChunks *sync.WaitGroup,
rowsLoaded *int64,
) (abort bool) { ) (abort bool) {
chunkStartTime := time.Now() chunkStartTime := time.Now()
_, err := db.CopyFrom( _, err := db.CopyFrom(
@@ -99,6 +102,7 @@ func loadChunkPostgres(
log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec) log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec)
atomic.AddInt64(rowsLoaded, int64(len(chunk.Data)))
wgActiveChunks.Done() wgActiveChunks.Done()
return false return false
} }

View File

@@ -36,7 +36,25 @@ func main() {
defer sourceDb.Close() defer sourceDb.Close()
defer targetDb.Close() defer targetDb.Close()
processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
log.Info("=== RESUMEN DE MIGRACIÓN ===")
var totalProcessed, totalErrors int64
for _, res := range results {
status := "OK"
if res.Error != nil {
status = "FAILED"
}
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
totalProcessed += res.RowsLoaded
if res.Error != nil {
totalErrors++
}
}
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
totalDuration := time.Since(startTime) totalDuration := time.Since(startTime)
log.Infof("=== Migration completed successfully! ===") log.Infof("=== Migration completed successfully! ===")
@@ -49,10 +67,10 @@ func processMigrationJobs(
targetDb *pgxpool.Pool, targetDb *pgxpool.Pool,
jobs []config.Job, jobs []config.Job,
maxParallelWorkers int, maxParallelWorkers int,
) { ) []JobResult {
if len(jobs) == 0 { if len(jobs) == 0 {
log.Info("No migration jobs configured") log.Info("No migration jobs configured")
return return []JobResult{}
} }
if maxParallelWorkers <= 0 { if maxParallelWorkers <= 0 {
@@ -65,6 +83,7 @@ func processMigrationJobs(
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs)) chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup var wgJobs sync.WaitGroup
@@ -72,7 +91,8 @@ func processMigrationJobs(
wgJobs.Go(func() { wgJobs.Go(func() {
for job := range chJobs { for job := range chJobs {
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
processMigrationJob(ctx, sourceDb, targetDb, job) res := processMigrationJob(ctx, sourceDb, targetDb, job)
chJobResults <- res
} }
}) })
} }
@@ -80,8 +100,17 @@ func processMigrationJobs(
for _, job := range jobs { for _, job := range jobs {
chJobs <- job chJobs <- job
} }
close(chJobs) close(chJobs)
go func() {
wgJobs.Wait() wgJobs.Wait()
close(chJobResults)
}()
var finalResults []JobResult
for res := range chJobResults {
finalResults = append(finalResults, res)
}
return finalResults
} }

13
cmd/go_migrate/metrics.go Normal file
View File

@@ -0,0 +1,13 @@
package main
import "time"
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
@@ -18,13 +19,18 @@ func processMigrationJob(
sourceDb *sql.DB, sourceDb *sql.DB,
targetDb *pgxpool.Pool, targetDb *pgxpool.Pool,
job config.Job, job config.Job,
) { ) JobResult {
jobStartTime := time.Now() result := JobResult{
log.Infof("Starting migration job: %s.%s [PK: %s]", job.SourceTable.Schema, job.SourceTable.Table, job.SourceTable.PrimaryKey) JobName: job.Name,
StartTime: time.Now(),
}
var rowsRead, rowsLoaded, rowsFailed int64
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable) sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable)
if err != nil { if err != nil {
log.Fatal("Unexpected error: ", err) result.Error = err
return result
} }
logColumnTypes(sourceColTypes, "Source col types") logColumnTypes(sourceColTypes, "Source col types")
@@ -54,6 +60,7 @@ func processMigrationJob(
go func() { go func() {
if err := jobErrorHandler(jobCtx, chJobErrors); err != nil { if err := jobErrorHandler(jobCtx, chJobErrors); err != nil {
cancel() cancel()
result.Error = err
} }
}() }()
@@ -62,11 +69,10 @@ func processMigrationJob(
maxExtractors := min(job.MaxExtractors, len(batches)) maxExtractors := min(job.MaxExtractors, len(batches))
log.Infof("Starting %d extractor(s)...", maxExtractors) log.Infof("Starting %d extractor(s)...", maxExtractors)
extractStartTime := time.Now()
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, job.ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, job.ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches, &rowsRead)
}) })
} }
@@ -78,7 +84,6 @@ func processMigrationJob(
}() }()
log.Infof("Starting %d transformer(s)...", maxExtractors) log.Infof("Starting %d transformer(s)...", maxExtractors)
transformStartTime := time.Now()
for range maxExtractors { for range maxExtractors {
wgTransformers.Go(func() { wgTransformers.Go(func() {
@@ -87,11 +92,10 @@ func processMigrationJob(
} }
log.Infof("Starting %d loader(s)...", job.MaxLoaders) log.Infof("Starting %d loader(s)...", job.MaxLoaders)
loadStartTime := time.Now()
for range job.MaxLoaders { for range job.MaxLoaders {
wgLoaders.Go(func() { wgLoaders.Go(func() {
loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks) loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks, &rowsLoaded)
}) })
} }
@@ -101,24 +105,31 @@ func processMigrationJob(
close(chExtractorErrors) close(chExtractorErrors)
wgExtractors.Wait() wgExtractors.Wait()
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
close(chChunksRaw) close(chChunksRaw)
wgTransformers.Wait() wgTransformers.Wait()
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
wgActiveChunks.Wait() wgActiveChunks.Wait()
close(chChunksTransformed) close(chChunksTransformed)
close(chLoadersErrors) close(chLoadersErrors)
wgLoaders.Wait() wgLoaders.Wait()
log.Infof("Loading completed in %v", time.Since(loadStartTime))
cancel() cancel()
}() }()
<-jobCtx.Done() <-jobCtx.Done()
log.Infof("Migration job completed (%s.%s). Total time: %v", job.SourceTable.Schema, job.SourceTable.Table, time.Since(jobStartTime))
if ctx.Err() != nil {
result.Error = ctx.Err()
}
result.Duration = time.Since(result.StartTime)
result.RowsRead = atomic.LoadInt64(&rowsRead)
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed)
return result
} }
func logColumnTypes(columnTypes []ColumnType, label string) { func logColumnTypes(columnTypes []ColumnType, label string) {