refactor: update RangeConfig to use pointers for min and max; adjust partition calculation logic to handle nil values

This commit is contained in:
2026-05-11 11:21:59 -05:00
parent 0784458106
commit fe35d2a34c
6 changed files with 88 additions and 43 deletions

View File

@@ -68,10 +68,10 @@ type TargetTableInfo struct {
} }
type RangeConfig struct { type RangeConfig struct {
Min int64 `yaml:"min"` Min *int64 `yaml:"min"`
Max int64 `yaml:"max"` Max *int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"` IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"` IsMaxInclusive bool `yaml:"is_max_inclusive"`
} }
type Job struct { type Job struct {

View File

@@ -20,20 +20,6 @@ func PartitionRangeGenerator(
rowsPerPartition int64, rowsPerPartition int64,
jobRange config.RangeConfig, jobRange config.RangeConfig,
) ([]models.Partition, error) { ) ([]models.Partition, error) {
if jobRange.Min > 0 {
return []models.Partition{{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
Min: jobRange.Min,
Max: jobRange.Max,
IsMinInclusive: jobRange.IsMinInclusive,
IsMaxInclusive: jobRange.IsMaxInclusive,
},
}}, nil
}
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo) rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table) logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table)
if err != nil { if err != nil {
@@ -41,21 +27,33 @@ func PartitionRangeGenerator(
} }
if rowsCount <= rowsPerPartition { if rowsCount <= rowsPerPartition {
return []models.Partition{{ hasRange := jobRange.Min != nil || jobRange.Max != nil
Id: uuid.New(), partition := models.Partition{Id: uuid.New(), HasRange: hasRange, RetryCounter: 0}
HasRange: false, if hasRange {
RetryCounter: 0, var min, max int64
}}, nil if jobRange.Min != nil {
min = *jobRange.Min
}
if jobRange.Max != nil {
max = *jobRange.Max
}
partition.Range = models.PartitionRange{
Min: min,
Max: max,
IsMinInclusive: jobRange.IsMinInclusive,
IsMaxInclusive: jobRange.IsMaxInclusive,
}
}
return []models.Partition{partition}, nil
} }
partitionsCount := rowsCount / rowsPerPartition partitionsCount := rowsCount / rowsPerPartition
if partitionCalculationStrategy == "ESTIMATION" { if partitionCalculationStrategy == "ESTIMATION" {
return calculatePartitionsEstimation(ctx, tableAnalyzer, tableInfo, partitionColumn, partitionsCount) return calculatePartitionsEstimation(ctx, tableAnalyzer, tableInfo, partitionColumn, partitionsCount, jobRange)
} }
partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount) partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount, jobRange)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -71,16 +69,37 @@ func calculatePartitionsEstimation(
tableInfo config.TableInfo, tableInfo config.TableInfo,
partitionColumn string, partitionColumn string,
partitionsCount int64, partitionsCount int64,
rangeConstraint config.RangeConfig,
) ([]models.Partition, error) { ) ([]models.Partition, error) {
result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn) var minValue, maxValue int64
if err != nil {
return nil, err if rangeConstraint.Min != nil && rangeConstraint.Max != nil {
minValue = *rangeConstraint.Min
maxValue = *rangeConstraint.Max
logrus.Infof("Column range for %s.%s.%s: [%d, %d] (user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue)
} else if rangeConstraint.Min != nil || rangeConstraint.Max != nil {
result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn)
if err != nil {
return nil, err
}
if rangeConstraint.Min != nil {
minValue = *rangeConstraint.Min
maxValue = result.Max
logrus.Infof("Column range for %s.%s.%s: [%d, %d] (min user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue)
} else {
minValue = result.Min
maxValue = *rangeConstraint.Max
logrus.Infof("Column range for %s.%s.%s: [%d, %d] (max user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue)
}
} else {
result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn)
if err != nil {
return nil, err
}
logrus.Infof("Column range for %s.%s.%s: [%d, %d]", tableInfo.Schema, tableInfo.Table, partitionColumn, result.Min, result.Max)
minValue = result.Min
maxValue = result.Max
} }
logrus.Infof("Column range for %s.%s.%s: [%d, %d]", tableInfo.Schema, tableInfo.Table, partitionColumn, result.Min, result.Max)
minValue := result.Min
maxValue := result.Max
rangeSize := maxValue - minValue rangeSize := maxValue - minValue
stepSize := int64(math.Ceil(float64(rangeSize) / float64(partitionsCount))) stepSize := int64(math.Ceil(float64(rangeSize) / float64(partitionsCount)))
@@ -94,9 +113,7 @@ func calculatePartitionsEstimation(
partitionMax = maxValue partitionMax = maxValue
} }
// Only the first partition has IsMinInclusive=true to avoid overlap
isMinInclusive := i == 0 isMinInclusive := i == 0
partition := models.Partition{ partition := models.Partition{
Id: uuid.New(), Id: uuid.New(),
HasRange: true, HasRange: true,

View File

@@ -26,7 +26,7 @@ func (m *MockTableAnalyzer) QueryMaxMinFromColumn(ctx context.Context, tableInfo
return etl.MaxMinColumnResult{Min: m.minValue, Max: m.maxValue}, nil return etl.MaxMinColumnResult{Min: m.minValue, Max: m.maxValue}, nil
} }
func (m *MockTableAnalyzer) CalculatePartitionRanges(ctx context.Context, tableInfo config.TableInfo, partitionColumn string, maxPartitions int64) ([]models.Partition, error) { func (m *MockTableAnalyzer) CalculatePartitionRanges(ctx context.Context, tableInfo config.TableInfo, partitionColumn string, maxPartitions int64, rangeConstraint config.RangeConfig) ([]models.Partition, error) {
return nil, nil return nil, nil
} }
@@ -35,7 +35,7 @@ func TestCalculatePartitionsEstimation_NoOverlap(t *testing.T) {
mock := &MockTableAnalyzer{minValue: 0, maxValue: 100} mock := &MockTableAnalyzer{minValue: 0, maxValue: 100}
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 4) partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 4, config.RangeConfig{})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -66,7 +66,7 @@ func TestCalculatePartitionsEstimation_CoverageComplete(t *testing.T) {
mock := &MockTableAnalyzer{minValue: 1000, maxValue: 2000} mock := &MockTableAnalyzer{minValue: 1000, maxValue: 2000}
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 5) partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 5, config.RangeConfig{})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -86,7 +86,7 @@ func TestCalculatePartitionsEstimation_FirstPartitionInclusive(t *testing.T) {
mock := &MockTableAnalyzer{minValue: 50, maxValue: 70} mock := &MockTableAnalyzer{minValue: 50, maxValue: 70}
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 3) partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 3, config.RangeConfig{})
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@@ -224,12 +224,37 @@ func (ta *MssqlTableAnalyzer) CalculatePartitionRanges(
tableInfo config.TableInfo, tableInfo config.TableInfo,
partitionColumn string, partitionColumn string,
maxPartitions int64, maxPartitions int64,
rangeConstraint config.RangeConfig,
) ([]models.Partition, error) { ) ([]models.Partition, error) {
whereClause := ""
args := []any{sql.Named("maxPartitions", maxPartitions)}
if rangeConstraint.Min != nil || rangeConstraint.Max != nil {
var conditions []string
if rangeConstraint.Min != nil {
minOp := ">"
if rangeConstraint.IsMinInclusive {
minOp = ">="
}
conditions = append(conditions, fmt.Sprintf("[%s] %s @rangeMin", partitionColumn, minOp))
args = append(args, sql.Named("rangeMin", *rangeConstraint.Min))
}
if rangeConstraint.Max != nil {
maxOp := "<"
if rangeConstraint.IsMaxInclusive {
maxOp = "<="
}
conditions = append(conditions, fmt.Sprintf("[%s] %s @rangeMax", partitionColumn, maxOp))
args = append(args, sql.Named("rangeMax", *rangeConstraint.Max))
}
whereClause = "WHERE " + strings.Join(conditions, " AND ")
}
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
MIN([%s]) AS lower_limit, MIN([%s]) AS lower_limit,
MAX([%s]) AS upper_limit MAX([%s]) AS upper_limit
FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s] %s) AS T
GROUP BY batch_id GROUP BY batch_id
ORDER BY batch_id`, ORDER BY batch_id`,
partitionColumn, partitionColumn,
@@ -237,12 +262,13 @@ ORDER BY batch_id`,
partitionColumn, partitionColumn,
partitionColumn, partitionColumn,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table) tableInfo.Table,
whereClause)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) rows, err := ta.db.Query(ctxTimeout, query, args...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -177,6 +177,7 @@ func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
tableInfo config.TableInfo, tableInfo config.TableInfo,
partitionColumn string, partitionColumn string,
maxPartitions int64, maxPartitions int64,
rangeConstraint config.RangeConfig,
) ([]models.Partition, error) { ) ([]models.Partition, error) {
return []models.Partition{}, nil return []models.Partition{}, nil
} }

View File

@@ -56,5 +56,6 @@ type TableAnalyzer interface {
tableInfo config.TableInfo, tableInfo config.TableInfo,
partitionColumn string, partitionColumn string,
maxPartitions int64, maxPartitions int64,
rangeConstraint config.RangeConfig,
) ([]models.Partition, error) ) ([]models.Partition, error)
} }