feat: enhance range handling in MSSQL and Postgres extractors; update partition range generator logic

This commit is contained in:
2026-04-21 11:32:52 -05:00
parent 9eb8800864
commit aa71eeb5c1
3 changed files with 69 additions and 38 deletions

View File

@@ -30,6 +30,8 @@ func buildExtractQueryMssql(
includeRange bool, includeRange bool,
isMinInclusive bool, isMinInclusive bool,
isMaxInclusive bool, isMaxInclusive bool,
hasMin bool,
hasMax bool,
) string { ) string {
var sbQuery strings.Builder var sbQuery strings.Builder
@@ -53,24 +55,33 @@ func buildExtractQueryMssql(
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table) fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table)
if includeRange { if includeRange && (hasMin || hasMax) {
fmt.Fprintf(&sbQuery, " WHERE [%s]", tableInfo.PrimaryKey) sbQuery.WriteString(" WHERE ")
if hasMin {
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
if isMinInclusive { if isMinInclusive {
sbQuery.WriteString(" >=") sbQuery.WriteString(" >=")
} else { } else {
sbQuery.WriteString(" >") sbQuery.WriteString(" >")
} }
sbQuery.WriteString(" @min")
}
sbQuery.WriteString(" @min AND ") if hasMin && hasMax {
sbQuery.WriteString(" AND ")
}
if hasMax {
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey) fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
if isMaxInclusive { if isMaxInclusive {
sbQuery.WriteString(" <=") sbQuery.WriteString(" <=")
} else { } else {
sbQuery.WriteString(" <") sbQuery.WriteString(" <")
} }
sbQuery.WriteString(" @max") sbQuery.WriteString(" @max")
} }
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey) fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey)
@@ -114,14 +125,16 @@ func (mssqlEx *MssqlExtractor) Exec(
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int, error) { ) (int, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive) hasMin := partition.HasRange && partition.Range.Min > 0
hasMax := partition.HasRange && partition.Range.Max > 0
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
var queryArgs []any var queryArgs []any
if partition.HasRange { if hasMin {
queryArgs = append(queryArgs, queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min))
sql.Named("min", partition.Range.Min), }
sql.Named("max", partition.Range.Max), if hasMax {
) queryArgs = append(queryArgs, sql.Named("max", partition.Range.Max))
} }
rowsRead := 0 rowsRead := 0

View File

@@ -27,6 +27,8 @@ func buildExtractQueryPostgres(
includeRange bool, includeRange bool,
isMinInclusive bool, isMinInclusive bool,
isMaxInclusive bool, isMaxInclusive bool,
hasMin bool,
hasMax bool,
) string { ) string {
var sbColumns strings.Builder var sbColumns strings.Builder
@@ -54,20 +56,34 @@ func buildExtractQueryPostgres(
query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table) query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table)
if includeRange { if includeRange && (hasMin || hasMax) {
query += fmt.Sprintf(` WHERE "%s"`, sourceDbInfo.PrimaryKey) query += " WHERE "
paramIdx := 1
if hasMin {
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
if isMinInclusive { if isMinInclusive {
query += " >=" query += " >="
} else { } else {
query += " >" query += " >"
} }
query += " $1 AND " + fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey) query += fmt.Sprintf(" $%d", paramIdx)
paramIdx++
}
if hasMin && hasMax {
query += " AND "
}
if hasMax {
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
if isMaxInclusive { if isMaxInclusive {
query += " <=" query += " <="
} else { } else {
query += " <" query += " <"
} }
query += " $2" query += fmt.Sprintf(" $%d", paramIdx)
}
} }
query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey) query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey)
@@ -84,14 +100,16 @@ func (postgresEx *PostgresExtractor) Exec(
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int, error) { ) (int, error) {
query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive) hasMin := partition.HasRange && partition.Range.Min > 0
hasMax := partition.HasRange && partition.Range.Max > 0
query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
var queryArgs []any var queryArgs []any
if partition.HasRange { if hasMin {
queryArgs = append(queryArgs, queryArgs = append(queryArgs, partition.Range.Min)
partition.Range.Min, }
partition.Range.Max, if hasMax {
) queryArgs = append(queryArgs, partition.Range.Max)
} }
rowsRead := 0 rowsRead := 0

View File

@@ -17,7 +17,7 @@ func PartitionRangeGenerator(
rowsPerPartition int64, rowsPerPartition int64,
jobRange config.RangeConfig, jobRange config.RangeConfig,
) ([]models.Partition, error) { ) ([]models.Partition, error) {
if jobRange.Max > 0 { if jobRange.Min > 0 {
return []models.Partition{{ return []models.Partition{{
Id: uuid.New(), Id: uuid.New(),
HasRange: true, HasRange: true,