feat: enhance logging and performance metrics for migration processes

This commit is contained in:
2026-04-07 15:53:17 -05:00
parent 7bb67ddfcf
commit 270a66dbbf
6 changed files with 78 additions and 13 deletions

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
@@ -22,12 +23,17 @@ func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
}
func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error {
chunkCount := 0
totalRowsLoaded := 0
for rows := range in {
chunkStartTime := time.Now()
identifier := pgx.Identifier{job.Schema, job.Table}
colNames := Map(columns, func(col ColumnType) string {
return col.name
})
copyStartTime := time.Now()
_, err := db.CopyFrom(
ctx,
identifier,
@@ -38,6 +44,15 @@ func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnTyp
if err != nil {
return err
}
chunkCount++
totalRowsLoaded += len(rows)
copyDuration := time.Since(copyStartTime)
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows",
chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
}
return nil