feat: enhance concurrency management by adding WaitGroup support in extractors and loaders

This commit is contained in:
2026-04-09 00:22:30 -05:00
parent dc632361e5
commit 51480015ba
8 changed files with 82 additions and 68 deletions

View File

@@ -6,6 +6,7 @@ import (
"errors"
"slices"
"strings"
"sync"
"time"
"github.com/google/uuid"
@@ -33,6 +34,7 @@ func extractFromMssql(
chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
return strings.EqualFold(col.name, job.PrimaryKey)
@@ -66,7 +68,7 @@ func extractFromMssql(
return
}
if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut); abort {
if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort {
return
}
}
@@ -83,6 +85,7 @@ func processBatch(
indexPrimaryKey int,
chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError,
wgActiveBatches *sync.WaitGroup,
) (abort bool) {
query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive)
log.Debug("Query used to extract data from mssql: ", query)
@@ -199,6 +202,7 @@ func processBatch(
}
}
wgActiveBatches.Done()
return false
}