feat: refactor chunk handling in extractor and transformer for improved data processing

This commit is contained in:
2026-04-08 21:09:26 -05:00
parent 853be4a5a6
commit f6dfcd390f
3 changed files with 41 additions and 14 deletions

View File

@@ -7,6 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb" _ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -14,6 +15,13 @@ import (
type UnknownRowValues = []any type UnknownRowValues = []any
type Chunk struct {
Id uuid.UUID
BatchId uuid.UUID
Data []UnknownRowValues
RetryCounter int
}
func extractFromMssql( func extractFromMssql(
ctx context.Context, ctx context.Context,
db *sql.DB, db *sql.DB,
@@ -21,7 +29,7 @@ func extractFromMssql(
columns []ColumnType, columns []ColumnType,
chunkSize int, chunkSize int,
chBatchesIn <-chan Batch, chBatchesIn <-chan Batch,
chChunksOut chan<- []UnknownRowValues, chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError, chErrorsOut chan<- ExtractorError,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
) { ) {
@@ -34,7 +42,11 @@ func extractFromMssql(
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: "Primary key not found in provided columns", Msg: "Primary key not found in provided columns",
} }
chJobErrorsOut <- jobError select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
return return
} }
@@ -89,8 +101,13 @@ func extractFromMssql(
} }
lastRow := rowsChunk[len(rowsChunk)-1] lastRow := rowsChunk[len(rowsChunk)-1]
chChunksOut <- rowsChunk
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
chChunksOut <- Chunk{
Id: uuid.New(),
BatchId: batch.Id,
Data: rowsChunk,
RetryCounter: 0,
}
return return
} }
@@ -102,7 +119,12 @@ func extractFromMssql(
rowsPerSec := float64(chunkSize) / chunkDuration.Seconds() rowsPerSec := float64(chunkSize) / chunkDuration.Seconds()
log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted) len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
chChunksOut <- rowsChunk chChunksOut <- Chunk{
Id: uuid.New(),
BatchId: batch.Id,
Data: rowsChunk,
RetryCounter: 0,
}
rowsChunk = make([]UnknownRowValues, 0, chunkSize) rowsChunk = make([]UnknownRowValues, 0, chunkSize)
chunkStartTime = time.Now() chunkStartTime = time.Now()
} }
@@ -113,7 +135,12 @@ func extractFromMssql(
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds() rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted) len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
chChunksOut <- rowsChunk chChunksOut <- Chunk{
Id: uuid.New(),
BatchId: batch.Id,
Data: rowsChunk,
RetryCounter: 0,
}
} }
if err := rows.Err(); err != nil { if err := rows.Err(); err != nil {

View File

@@ -48,7 +48,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors) extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors)
}() }()
chChunks := make(chan []UnknownRowValues, QueueSize) chChunks := make(chan Chunk, QueueSize)
maxExtractors := min(NumExtractors, len(batches)) maxExtractors := min(NumExtractors, len(batches))
var wgMssqlExtractors sync.WaitGroup var wgMssqlExtractors sync.WaitGroup

View File

@@ -8,18 +8,18 @@ import (
func transformRowsMssql( func transformRowsMssql(
columns []ColumnType, columns []ColumnType,
chChunksIn <-chan []UnknownRowValues, chChunksIn <-chan Chunk,
chChunksOut chan<- []UnknownRowValues, chChunksOut chan<- []UnknownRowValues,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
) { ) {
chunkCount := 0 chunkCount := 0
totalRowsTransformed := 0 totalRowsTransformed := 0
for rows := range chChunksIn { for chunk := range chChunksIn {
chunkStartTime := time.Now() chunkStartTime := time.Now()
log.Debugf("Chunk received, transforming %d rows...", len(rows)) log.Debugf("Chunk received, transforming %d rows...", len(chunk.Data))
for _, rowValues := range rows { for _, rowValues := range chunk.Data {
for i, col := range columns { for i, col := range columns {
value := rowValues[i] value := rowValues[i]
@@ -61,12 +61,12 @@ func transformRowsMssql(
} }
chunkCount++ chunkCount++
totalRowsTransformed += len(rows) totalRowsTransformed += len(chunk.Data)
chunkDuration := time.Since(chunkStartTime) chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds()
log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
len(rows), chunkDuration, rowsPerSec, totalRowsTransformed) len(chunk.Data), chunkDuration, rowsPerSec, totalRowsTransformed)
chChunksOut <- rows chChunksOut <- chunk.Data
} }
} }