feat: enhance batch processing by adding rowsPerBatch parameter and improving logging messages

This commit is contained in:
2026-04-09 19:46:45 -05:00
parent 524d892a60
commit 0d9f955b2f
3 changed files with 17 additions and 17 deletions

View File

@@ -91,15 +91,15 @@ ORDER BY batch_id`,
return batches, nil return batches, nil
} }
func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) ([]Batch, error) { func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]Batch, error) {
rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var batchCount int64 = 1 var batchCount int64 = 1
if rowsCount > RowsPerBatch { if rowsCount > rowsPerBatch {
batchCount = rowsCount / RowsPerBatch batchCount = rowsCount / rowsPerBatch
} else { } else {
return []Batch{{ return []Batch{{
Id: uuid.New(), Id: uuid.New(),

View File

@@ -76,7 +76,7 @@ func loadChunkPostgres(
select { select {
case chJobErrorsOut <- JobError{ case chJobErrorsOut <- JobError{
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal data integrity error in table %s", identifier.Sanitize()), Msg: fmt.Sprintf("Fatal error in table %s", identifier.Sanitize()),
Prev: err, Prev: err,
}: }:
case <-ctx.Done(): case <-ctx.Done():

View File

@@ -33,17 +33,17 @@ func processMigrationJob(
jobCtx, cancel := context.WithCancel(ctx) jobCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable) batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerBatch)
if err != nil { if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
chJobErrors := make(chan JobError, 50) chJobErrors := make(chan JobError, job.QueueSize)
chBatches := make(chan Batch, QueueSize) chBatches := make(chan Batch, job.QueueSize)
chExtractorErrors := make(chan ExtractorError, QueueSize) chExtractorErrors := make(chan ExtractorError, job.QueueSize)
chChunksRaw := make(chan Chunk, QueueSize) chChunksRaw := make(chan Chunk, job.QueueSize)
chChunksTransformed := make(chan Chunk, QueueSize) chChunksTransformed := make(chan Chunk, job.QueueSize)
chLoadersErrors := make(chan LoaderError, QueueSize) chLoadersErrors := make(chan LoaderError, job.QueueSize)
var wgActiveBatches sync.WaitGroup var wgActiveBatches sync.WaitGroup
var wgActiveChunks sync.WaitGroup var wgActiveChunks sync.WaitGroup
@@ -60,13 +60,13 @@ func processMigrationJob(
go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
maxExtractors := min(NumExtractors, len(batches)) maxExtractors := min(job.MaxExtractors, len(batches))
log.Infof("Starting %d extractors...", maxExtractors) log.Infof("Starting %d extractor(s)...", maxExtractors)
extractStartTime := time.Now() extractStartTime := time.Now()
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, job.ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches)
}) })
} }
@@ -77,7 +77,7 @@ func processMigrationJob(
} }
}() }()
log.Infof("Starting %d transformers...", maxExtractors) log.Infof("Starting %d transformer(s)...", maxExtractors)
transformStartTime := time.Now() transformStartTime := time.Now()
for range maxExtractors { for range maxExtractors {
@@ -86,10 +86,10 @@ func processMigrationJob(
}) })
} }
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) log.Infof("Starting %d loader(s)...", job.MaxLoaders)
loadStartTime := time.Now() loadStartTime := time.Now()
for range NumLoaders { for range job.MaxLoaders {
wgLoaders.Go(func() { wgLoaders.Go(func() {
loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks) loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
}) })