feat: implement batch processing for MSSQL extraction and transformation with range handling

This commit is contained in:
2026-04-07 23:39:55 -05:00
parent 8903a04f4d
commit 8afdb45318
5 changed files with 164 additions and 27 deletions

View File

@@ -12,12 +12,20 @@ import (
type UnknownRowValues = []any
func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *sql.DB, out chan<- []UnknownRowValues) error {
query := buildExtractQueryMssql(job, columns)
func extractFromMssql(ctx context.Context, db *sql.DB, job MigrationJob, columns []ColumnType, chunkSize int, batchRange BatchRange, out chan<- []UnknownRowValues) error {
query := buildExtractQueryMssql(job, columns, batchRange.validRange)
log.Debug("Query used to extract data from mssql: ", query)
var queryArgs []any
if batchRange.validRange {
queryArgs = append(queryArgs,
sql.Named("minRange", batchRange.LowerLimit),
sql.Named("maxRange", batchRange.UpperLimit),
)
}
queryStartTime := time.Now()
rows, err := db.QueryContext(ctx, query)
rows, err := db.QueryContext(ctx, query, queryArgs...)
if err != nil {
return err
}