feat: refactor chunk handling in loader and transformer for improved data processing

This commit is contained in:
2026-04-08 21:11:26 -05:00
parent f6dfcd390f
commit 7924dd3af7
3 changed files with 22 additions and 22 deletions

View File

@@ -12,24 +12,11 @@ import (
log "github.com/sirupsen/logrus"
)
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
for rows := range in {
log.Debugf("Chunk received, loading data into...")
for i, rowValues := range rows {
if i%100 == 0 {
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
}
}
}
}
func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error {
func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, chChunksIn <-chan Chunk) error {
chunkCount := 0
totalRowsLoaded := 0
for rows := range in {
for chunk := range chChunksIn {
chunkStartTime := time.Now()
identifier := pgx.Identifier{job.Schema, job.Table}
colNames := Map(columns, func(col ColumnType) string {
@@ -41,7 +28,7 @@ func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnTyp
ctx,
identifier,
colNames,
pgx.CopyFromRows(rows),
pgx.CopyFromRows(chunk.Data),
)
if err != nil {
@@ -49,12 +36,12 @@ func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnTyp
}
chunkCount++
totalRowsLoaded += len(rows)
totalRowsLoaded += len(chunk.Data)
copyDuration := time.Since(copyStartTime)
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds()
log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(chunk.Data), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
}
return nil
@@ -134,3 +121,16 @@ func Map[T any, V any](input []T, mapper func(T) V) []V {
return result
}
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
for rows := range in {
log.Debugf("Chunk received, loading data into...")
for i, rowValues := range rows {
if i%100 == 0 {
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
}
}
}
}

View File

@@ -74,7 +74,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
}()
chChunksTransform := make(chan []UnknownRowValues, QueueSize)
chChunksTransform := make(chan Chunk, QueueSize)
var wgMssqlTransformers sync.WaitGroup
log.Infof("Starting %d MSSQL transformers...", maxExtractors)

View File

@@ -9,7 +9,7 @@ import (
func transformRowsMssql(
columns []ColumnType,
chChunksIn <-chan Chunk,
chChunksOut chan<- []UnknownRowValues,
chChunksOut chan<- Chunk,
chJobErrorsOut chan<- JobError,
) {
chunkCount := 0
@@ -67,6 +67,6 @@ func transformRowsMssql(
log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
len(chunk.Data), chunkDuration, rowsPerSec, totalRowsTransformed)
chChunksOut <- chunk.Data
chChunksOut <- chunk
}
}