diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go index b7e9ef2..68095d2 100644 --- a/cmd/go_migrate/extractor.go +++ b/cmd/go_migrate/extractor.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "time" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" @@ -15,13 +16,18 @@ func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnTyp query := buildExtractQueryMssql(job, columns) log.Debug("Query used to extract data from mssql: ", query) + queryStartTime := time.Now() rows, err := db.QueryContext(ctx, query) if err != nil { return err } defer rows.Close() + log.Debugf("Query executed in %v", time.Since(queryStartTime)) rowsChunk := make([]UnknownRowValues, 0, chunkSize) + totalRowsExtracted := 0 + chunkCount := 0 + chunkStartTime := time.Now() for rows.Next() { values := make([]any, len(columns)) @@ -36,20 +42,29 @@ func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnTyp } rowsChunk = append(rowsChunk, values) + totalRowsExtracted++ if len(rowsChunk) >= chunkSize { + chunkCount++ + chunkDuration := time.Since(chunkStartTime) + rowsPerSec := float64(chunkSize) / chunkDuration.Seconds() + log.Infof("Extracted chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows", chunkCount, len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted) out <- rowsChunk rowsChunk = make([]UnknownRowValues, 0, chunkSize) - log.Debugf("Chunk send... %+v", job) + chunkStartTime = time.Now() } } if len(rowsChunk) > 0 { + chunkCount++ + chunkDuration := time.Since(chunkStartTime) + rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds() + log.Infof("Extracted final chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows", + chunkCount, len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted) out <- rowsChunk - log.Debugf("Chunk send... %+v", job) } - return nil + return rows.Err() } func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error { diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index 6ea285f..0a9f13e 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -22,12 +23,17 @@ func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) { } func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error { + chunkCount := 0 + totalRowsLoaded := 0 + for rows := range in { + chunkStartTime := time.Now() identifier := pgx.Identifier{job.Schema, job.Table} colNames := Map(columns, func(col ColumnType) string { return col.name }) + copyStartTime := time.Now() _, err := db.CopyFrom( ctx, identifier, @@ -38,6 +44,15 @@ func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnTyp if err != nil { return err } + + chunkCount++ + totalRowsLoaded += len(rows) + copyDuration := time.Since(copyStartTime) + chunkDuration := time.Since(chunkStartTime) + rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() + + log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", + chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded) } return nil diff --git a/cmd/go_migrate/log.go b/cmd/go_migrate/log.go index 1e7e741..a7bad26 100644 --- a/cmd/go_migrate/log.go +++ b/cmd/go_migrate/log.go @@ -10,6 +10,8 @@ func configureLog() { log.SetFormatter(&log.TextFormatter{ FullTimestamp: true, TimestampFormat: time.StampMilli, + DisableSorting: false, + PadLevelText: true, }) log.SetLevel(log.InfoLevel) } diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index e8388c8..d2f0a88 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -1,6 +1,8 @@ package main import ( + "time" + log "github.com/sirupsen/logrus" ) @@ -20,15 +22,16 @@ var migrationJobs []MigrationJob = []MigrationJob{ const ( NumExtractors int = 1 - NumLoaders int = 2 - ChunkSize int = 50000 + NumLoaders int = 4 + ChunkSize int = 100000 QueueSize int = 10 ) func main() { configureLog() - log.Info("Starting migration...") - // log.Debugf("Migration jobs: %+v", migrationJobs) + startTime := time.Now() + log.Info("=== Starting migration ===") + log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize) sourceDb, targetDb, connError := connectToDatabases() if connError != nil { @@ -39,9 +42,11 @@ func main() { defer targetDb.Close() for _, job := range migrationJobs { - log.Infof("Processing job: %+v", job) + log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table) processMigrationJob(sourceDb, targetDb, job) } - log.Info("Migration completed successfully!") + totalDuration := time.Since(startTime) + log.Infof("=== Migration completed successfully! ===") + log.Infof("Total migration time: %v", totalDuration) } diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index ec5aa24..e6af049 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "sync" + "time" "github.com/jackc/pgx/v5/pgxpool" @@ -12,6 +13,9 @@ import ( ) func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job MigrationJob) { + jobStartTime := time.Now() + log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey) + sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job) if err != nil { log.Fatal("Unexpected error: ", err) @@ -25,20 +29,29 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration mssqlCtx := context.Background() go func() { + log.Info("Starting extraction from MSSQL...") + extractStartTime := time.Now() if err := extractFromMssql(mssqlCtx, job, sourceColTypes, ChunkSize, sourceDb, chRowsExtract); err != nil { - log.Error("Unexpected error extrating data from mssql: ", err) + log.Error("Unexpected error extracting data from mssql: ", err) } close(chRowsExtract) + log.Infof("Extraction completed in %v", time.Since(extractStartTime)) }() go func() { + log.Info("Starting transformation of rows...") + transformStartTime := time.Now() transformRowsMssql(sourceColTypes, chRowsExtract, chRowsTransform) close(chRowsTransform) + log.Infof("Transformation completed in %v", time.Since(transformStartTime)) }() var wgPostgresLoaders sync.WaitGroup postgresLoaderCtx := context.Background() + log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) + loaderStartTime := time.Now() + for range NumLoaders { wgPostgresLoaders.Go(func() { if err := loadRowsPostgres(postgresLoaderCtx, job, sourceColTypes, targetDb, chRowsTransform); err != nil { @@ -48,13 +61,17 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration } wgPostgresLoaders.Wait() + log.Infof("Loading completed in %v", time.Since(loaderStartTime)) + + totalDuration := time.Since(jobStartTime) + log.Infof("Migration job completed successfully! Total time: %v", totalDuration) } func logColumnTypes(columnTypes []ColumnType, label string) { - log.Info(label) + log.Debug(label) for _, col := range columnTypes { - log.Infof("%+v", col) + log.Debugf("%+v", col) } } diff --git a/cmd/go_migrate/transformer.go b/cmd/go_migrate/transformer.go index 330f16b..fd68e02 100644 --- a/cmd/go_migrate/transformer.go +++ b/cmd/go_migrate/transformer.go @@ -7,8 +7,12 @@ import ( ) func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) { + chunkCount := 0 + totalRowsTransformed := 0 + for rows := range in { - log.Debugf("Chunk received, transforming...") + chunkStartTime := time.Now() + log.Debugf("Chunk #%d received, transforming %d rows...", chunkCount+1, len(rows)) for _, rowValues := range rows { for i, col := range columns { @@ -29,6 +33,13 @@ func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out } } + chunkCount++ + totalRowsTransformed += len(rows) + chunkDuration := time.Since(chunkStartTime) + rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() + log.Infof("Transformed chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows", + chunkCount, len(rows), chunkDuration, rowsPerSec, totalRowsTransformed) + out <- rows } }