diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index f30646d..e520375 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -68,10 +68,10 @@ type TargetTableInfo struct { } type RangeConfig struct { - Min int64 `yaml:"min"` - Max int64 `yaml:"max"` - IsMinInclusive bool `yaml:"is_min_inclusive"` - IsMaxInclusive bool `yaml:"is_max_inclusive"` + Min *int64 `yaml:"min"` + Max *int64 `yaml:"max"` + IsMinInclusive bool `yaml:"is_min_inclusive"` + IsMaxInclusive bool `yaml:"is_max_inclusive"` } type Job struct { diff --git a/internal/app/etl/table_analyzers/main.go b/internal/app/etl/table_analyzers/main.go index 311134f..e1d22cb 100644 --- a/internal/app/etl/table_analyzers/main.go +++ b/internal/app/etl/table_analyzers/main.go @@ -20,20 +20,6 @@ func PartitionRangeGenerator( rowsPerPartition int64, jobRange config.RangeConfig, ) ([]models.Partition, error) { - if jobRange.Min > 0 { - return []models.Partition{{ - Id: uuid.New(), - HasRange: true, - RetryCounter: 0, - Range: models.PartitionRange{ - Min: jobRange.Min, - Max: jobRange.Max, - IsMinInclusive: jobRange.IsMinInclusive, - IsMaxInclusive: jobRange.IsMaxInclusive, - }, - }}, nil - } - rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo) logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table) if err != nil { @@ -41,21 +27,33 @@ func PartitionRangeGenerator( } if rowsCount <= rowsPerPartition { - return []models.Partition{{ - Id: uuid.New(), - HasRange: false, - RetryCounter: 0, - }}, nil - + hasRange := jobRange.Min != nil || jobRange.Max != nil + partition := models.Partition{Id: uuid.New(), HasRange: hasRange, RetryCounter: 0} + if hasRange { + var min, max int64 + if jobRange.Min != nil { + min = *jobRange.Min + } + if jobRange.Max != nil { + max = *jobRange.Max + } + partition.Range = models.PartitionRange{ + Min: min, + Max: max, + IsMinInclusive: jobRange.IsMinInclusive, + IsMaxInclusive: jobRange.IsMaxInclusive, + } + } + return []models.Partition{partition}, nil } partitionsCount := rowsCount / rowsPerPartition if partitionCalculationStrategy == "ESTIMATION" { - return calculatePartitionsEstimation(ctx, tableAnalyzer, tableInfo, partitionColumn, partitionsCount) + return calculatePartitionsEstimation(ctx, tableAnalyzer, tableInfo, partitionColumn, partitionsCount, jobRange) } - partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount) + partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount, jobRange) if err != nil { return nil, err } @@ -71,16 +69,37 @@ func calculatePartitionsEstimation( tableInfo config.TableInfo, partitionColumn string, partitionsCount int64, + rangeConstraint config.RangeConfig, ) ([]models.Partition, error) { - result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn) - if err != nil { - return nil, err + var minValue, maxValue int64 + + if rangeConstraint.Min != nil && rangeConstraint.Max != nil { + minValue = *rangeConstraint.Min + maxValue = *rangeConstraint.Max + logrus.Infof("Column range for %s.%s.%s: [%d, %d] (user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue) + } else if rangeConstraint.Min != nil || rangeConstraint.Max != nil { + result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn) + if err != nil { + return nil, err + } + if rangeConstraint.Min != nil { + minValue = *rangeConstraint.Min + maxValue = result.Max + logrus.Infof("Column range for %s.%s.%s: [%d, %d] (min user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue) + } else { + minValue = result.Min + maxValue = *rangeConstraint.Max + logrus.Infof("Column range for %s.%s.%s: [%d, %d] (max user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue) + } + } else { + result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn) + if err != nil { + return nil, err + } + logrus.Infof("Column range for %s.%s.%s: [%d, %d]", tableInfo.Schema, tableInfo.Table, partitionColumn, result.Min, result.Max) + minValue = result.Min + maxValue = result.Max } - - logrus.Infof("Column range for %s.%s.%s: [%d, %d]", tableInfo.Schema, tableInfo.Table, partitionColumn, result.Min, result.Max) - - minValue := result.Min - maxValue := result.Max rangeSize := maxValue - minValue stepSize := int64(math.Ceil(float64(rangeSize) / float64(partitionsCount))) @@ -94,9 +113,7 @@ func calculatePartitionsEstimation( partitionMax = maxValue } - // Only the first partition has IsMinInclusive=true to avoid overlap isMinInclusive := i == 0 - partition := models.Partition{ Id: uuid.New(), HasRange: true, diff --git a/internal/app/etl/table_analyzers/main_test.go b/internal/app/etl/table_analyzers/main_test.go index a882a7f..d243bce 100644 --- a/internal/app/etl/table_analyzers/main_test.go +++ b/internal/app/etl/table_analyzers/main_test.go @@ -26,7 +26,7 @@ func (m *MockTableAnalyzer) QueryMaxMinFromColumn(ctx context.Context, tableInfo return etl.MaxMinColumnResult{Min: m.minValue, Max: m.maxValue}, nil } -func (m *MockTableAnalyzer) CalculatePartitionRanges(ctx context.Context, tableInfo config.TableInfo, partitionColumn string, maxPartitions int64) ([]models.Partition, error) { +func (m *MockTableAnalyzer) CalculatePartitionRanges(ctx context.Context, tableInfo config.TableInfo, partitionColumn string, maxPartitions int64, rangeConstraint config.RangeConfig) ([]models.Partition, error) { return nil, nil } @@ -35,7 +35,7 @@ func TestCalculatePartitionsEstimation_NoOverlap(t *testing.T) { mock := &MockTableAnalyzer{minValue: 0, maxValue: 100} tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} - partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 4) + partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 4, config.RangeConfig{}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -66,7 +66,7 @@ func TestCalculatePartitionsEstimation_CoverageComplete(t *testing.T) { mock := &MockTableAnalyzer{minValue: 1000, maxValue: 2000} tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} - partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 5) + partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 5, config.RangeConfig{}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -86,7 +86,7 @@ func TestCalculatePartitionsEstimation_FirstPartitionInclusive(t *testing.T) { mock := &MockTableAnalyzer{minValue: 50, maxValue: 70} tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} - partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 3) + partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 3, config.RangeConfig{}) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/internal/app/etl/table_analyzers/mssql.go b/internal/app/etl/table_analyzers/mssql.go index 3956c48..1c7dd5f 100644 --- a/internal/app/etl/table_analyzers/mssql.go +++ b/internal/app/etl/table_analyzers/mssql.go @@ -224,12 +224,37 @@ func (ta *MssqlTableAnalyzer) CalculatePartitionRanges( tableInfo config.TableInfo, partitionColumn string, maxPartitions int64, + rangeConstraint config.RangeConfig, ) ([]models.Partition, error) { + whereClause := "" + args := []any{sql.Named("maxPartitions", maxPartitions)} + + if rangeConstraint.Min != nil || rangeConstraint.Max != nil { + var conditions []string + if rangeConstraint.Min != nil { + minOp := ">" + if rangeConstraint.IsMinInclusive { + minOp = ">=" + } + conditions = append(conditions, fmt.Sprintf("[%s] %s @rangeMin", partitionColumn, minOp)) + args = append(args, sql.Named("rangeMin", *rangeConstraint.Min)) + } + if rangeConstraint.Max != nil { + maxOp := "<" + if rangeConstraint.IsMaxInclusive { + maxOp = "<=" + } + conditions = append(conditions, fmt.Sprintf("[%s] %s @rangeMax", partitionColumn, maxOp)) + args = append(args, sql.Named("rangeMax", *rangeConstraint.Max)) + } + whereClause = "WHERE " + strings.Join(conditions, " AND ") + } + query := fmt.Sprintf(` SELECT MIN([%s]) AS lower_limit, MAX([%s]) AS upper_limit -FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T +FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s] %s) AS T GROUP BY batch_id ORDER BY batch_id`, partitionColumn, @@ -237,12 +262,13 @@ ORDER BY batch_id`, partitionColumn, partitionColumn, tableInfo.Schema, - tableInfo.Table) + tableInfo.Table, + whereClause) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() - rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) + rows, err := ta.db.Query(ctxTimeout, query, args...) if err != nil { return nil, err } diff --git a/internal/app/etl/table_analyzers/postgres.go b/internal/app/etl/table_analyzers/postgres.go index f5345ce..194eae4 100644 --- a/internal/app/etl/table_analyzers/postgres.go +++ b/internal/app/etl/table_analyzers/postgres.go @@ -177,6 +177,7 @@ func (ta *PostgresTableAnalyzer) CalculatePartitionRanges( tableInfo config.TableInfo, partitionColumn string, maxPartitions int64, + rangeConstraint config.RangeConfig, ) ([]models.Partition, error) { return []models.Partition{}, nil } diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index e489366..991c5b2 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -56,5 +56,6 @@ type TableAnalyzer interface { tableInfo config.TableInfo, partitionColumn string, maxPartitions int64, + rangeConstraint config.RangeConfig, ) ([]models.Partition, error) }