diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index abc4c8e..58a919d 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -12,24 +12,11 @@ import ( log "github.com/sirupsen/logrus" ) -func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) { - - for rows := range in { - log.Debugf("Chunk received, loading data into...") - - for i, rowValues := range rows { - if i%100 == 0 { - logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) - } - } - } -} - -func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error { +func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, chChunksIn <-chan Chunk) error { chunkCount := 0 totalRowsLoaded := 0 - for rows := range in { + for chunk := range chChunksIn { chunkStartTime := time.Now() identifier := pgx.Identifier{job.Schema, job.Table} colNames := Map(columns, func(col ColumnType) string { @@ -41,7 +28,7 @@ func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnTyp ctx, identifier, colNames, - pgx.CopyFromRows(rows), + pgx.CopyFromRows(chunk.Data), ) if err != nil { @@ -49,12 +36,12 @@ func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnTyp } chunkCount++ - totalRowsLoaded += len(rows) + totalRowsLoaded += len(chunk.Data) copyDuration := time.Since(copyStartTime) chunkDuration := time.Since(chunkStartTime) - rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() + rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds() - log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded) + log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(chunk.Data), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded) } return nil @@ -134,3 +121,16 @@ func Map[T any, V any](input []T, mapper func(T) V) []V { return result } + +func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) { + + for rows := range in { + log.Debugf("Chunk received, loading data into...") + + for i, rowValues := range rows { + if i%100 == 0 { + logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) + } + } + } +} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 5ecc68d..27e82ad 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -74,7 +74,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration log.Infof("Extraction completed in %v", time.Since(extractStartTime)) }() - chChunksTransform := make(chan []UnknownRowValues, QueueSize) + chChunksTransform := make(chan Chunk, QueueSize) var wgMssqlTransformers sync.WaitGroup log.Infof("Starting %d MSSQL transformers...", maxExtractors) diff --git a/cmd/go_migrate/transformer.go b/cmd/go_migrate/transformer.go index 5fa850a..fc39349 100644 --- a/cmd/go_migrate/transformer.go +++ b/cmd/go_migrate/transformer.go @@ -9,7 +9,7 @@ import ( func transformRowsMssql( columns []ColumnType, chChunksIn <-chan Chunk, - chChunksOut chan<- []UnknownRowValues, + chChunksOut chan<- Chunk, chJobErrorsOut chan<- JobError, ) { chunkCount := 0 @@ -67,6 +67,6 @@ func transformRowsMssql( log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(chunk.Data), chunkDuration, rowsPerSec, totalRowsTransformed) - chChunksOut <- chunk.Data + chChunksOut <- chunk } }