refactor: add partition calculation strategy and implement estimation logic; enhance table analyzers for max/min column queries

This commit is contained in:
2026-05-09 10:35:36 -05:00
parent 8f8d2d11a4
commit 723041ff2c
8 changed files with 224 additions and 15 deletions

View File

@@ -104,6 +104,7 @@ func processMigrationJob(
sourceTableAnalyzer, sourceTableAnalyzer,
job.SourceTable.TableInfo, job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey, job.SourceTable.PrimaryKey,
job.PartitionCalculationStrategy,
job.RowsPerPartition, job.RowsPerPartition,
job.Range, job.Range,
) )

View File

@@ -12,6 +12,7 @@ defaults:
transformer_queue_size: 8 transformer_queue_size: 8
max_loaders: 4 max_loaders: 4
loader_batch_size: 25000 loader_batch_size: 25000
partition_calculation_strategy: EXACT # EXACT | ESTIMATION
truncate_target: true truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE truncate_method: TRUNCATE # TRUNCATE | DELETE
retry: retry:

View File

@@ -27,20 +27,21 @@ type ToStorageConfig struct {
} }
type JobConfig struct { type JobConfig struct {
BatchesPerPartition int `yaml:"batches_per_partition"` BatchesPerPartition int `yaml:"batches_per_partition"`
MaxExtractors int `yaml:"max_extractors"` MaxExtractors int `yaml:"max_extractors"`
ExtractorBatchSize int `yaml:"extractor_batch_size"` ExtractorBatchSize int `yaml:"extractor_batch_size"`
ExtractorQueueSize int `yaml:"extractor_queue_size"` ExtractorQueueSize int `yaml:"extractor_queue_size"`
MaxTransformers int `yaml:"max_transformers"` MaxTransformers int `yaml:"max_transformers"`
TransformerBatchSize int `yaml:"transformer_batch_size"` TransformerBatchSize int `yaml:"transformer_batch_size"`
TransformerQueueSize int `yaml:"transformer_queue_size"` TransformerQueueSize int `yaml:"transformer_queue_size"`
MaxLoaders int `yaml:"max_loaders"` MaxLoaders int `yaml:"max_loaders"`
LoaderBatchSize int `yaml:"loader_batch_size"` LoaderBatchSize int `yaml:"loader_batch_size"`
TruncateTarget bool `yaml:"truncate_target"` PartitionCalculationStrategy string `yaml:"partition_calculation_strategy"`
TruncateMethod string `yaml:"truncate_method"` TruncateTarget bool `yaml:"truncate_target"`
Retry RetryConfig `yaml:"retry"` TruncateMethod string `yaml:"truncate_method"`
RowsPerPartition int64 Retry RetryConfig `yaml:"retry"`
ToStorage ToStorageConfig `yaml:"to_storage"` RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
} }
type FromJsonItem struct { type FromJsonItem struct {

View File

@@ -2,6 +2,7 @@ package table_analyzers
import ( import (
"context" "context"
"math"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
@@ -15,6 +16,7 @@ func PartitionRangeGenerator(
tableAnalyzer etl.TableAnalyzer, tableAnalyzer etl.TableAnalyzer,
tableInfo config.TableInfo, tableInfo config.TableInfo,
partitionColumn string, partitionColumn string,
partitionCalculationStrategy string,
rowsPerPartition int64, rowsPerPartition int64,
jobRange config.RangeConfig, jobRange config.RangeConfig,
) ([]models.Partition, error) { ) ([]models.Partition, error) {
@@ -48,12 +50,67 @@ func PartitionRangeGenerator(
} }
partitionsCount := rowsCount / rowsPerPartition partitionsCount := rowsCount / rowsPerPartition
if partitionCalculationStrategy == "ESTIMATION" {
return calculatePartitionsEstimation(ctx, tableAnalyzer, tableInfo, partitionColumn, partitionsCount)
}
partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount) partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table) logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table)
return partitions, nil
}
func calculatePartitionsEstimation(
ctx context.Context,
tableAnalyzer etl.TableAnalyzer,
tableInfo config.TableInfo,
partitionColumn string,
partitionsCount int64,
) ([]models.Partition, error) {
result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn)
if err != nil {
return nil, err
}
logrus.Infof("Column range for %s.%s.%s: [%d, %d]", tableInfo.Schema, tableInfo.Table, partitionColumn, result.Min, result.Max)
minValue := result.Min
maxValue := result.Max
rangeSize := maxValue - minValue
stepSize := int64(math.Ceil(float64(rangeSize) / float64(partitionsCount)))
partitions := make([]models.Partition, 0, partitionsCount)
for i := range partitionsCount {
partitionMin := minValue + (i * stepSize)
partitionMax := minValue + ((i + 1) * stepSize)
if i == partitionsCount-1 {
partitionMax = maxValue
}
// Only the first partition has IsMinInclusive=true to avoid overlap
isMinInclusive := i == 0
partition := models.Partition{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
Min: partitionMin,
Max: partitionMax,
IsMinInclusive: isMinInclusive,
IsMaxInclusive: true,
},
}
partitions = append(partitions, partition)
}
return partitions, nil return partitions, nil
} }

View File

@@ -0,0 +1,107 @@
package table_analyzers
import (
"context"
"testing"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type MockTableAnalyzer struct {
minValue int64
maxValue int64
}
func (m *MockTableAnalyzer) QueryColumnTypes(ctx context.Context, tableInfo config.TableInfo) ([]models.ColumnType, error) {
return nil, nil
}
func (m *MockTableAnalyzer) EstimateTotalRows(ctx context.Context, tableInfo config.TableInfo) (int64, error) {
return 0, nil
}
func (m *MockTableAnalyzer) QueryMaxMinFromColumn(ctx context.Context, tableInfo config.TableInfo, columnName string) (etl.MaxMinColumnResult, error) {
return etl.MaxMinColumnResult{Min: m.minValue, Max: m.maxValue}, nil
}
func (m *MockTableAnalyzer) CalculatePartitionRanges(ctx context.Context, tableInfo config.TableInfo, partitionColumn string, maxPartitions int64) ([]models.Partition, error) {
return nil, nil
}
func TestCalculatePartitionsEstimation_NoOverlap(t *testing.T) {
ctx := context.Background()
mock := &MockTableAnalyzer{minValue: 0, maxValue: 100}
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 4)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(partitions) != 4 {
t.Errorf("expected 4 partitions, got %d", len(partitions))
}
for i := 0; i < len(partitions)-1; i++ {
current := partitions[i].Range
next := partitions[i+1].Range
if current.Max == next.Min {
if current.IsMaxInclusive && next.IsMinInclusive {
t.Errorf("partition %d and %d overlap at value %d (both inclusive)", i, i+1, current.Max)
}
}
}
t.Logf("Partitions generated:")
for i, p := range partitions {
t.Logf(" P%d: [%d, %d] (minInc=%v, maxInc=%v)", i, p.Range.Min, p.Range.Max, p.Range.IsMinInclusive, p.Range.IsMaxInclusive)
}
}
func TestCalculatePartitionsEstimation_CoverageComplete(t *testing.T) {
ctx := context.Background()
mock := &MockTableAnalyzer{minValue: 1000, maxValue: 2000}
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 5)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if partitions[0].Range.Min != 1000 || !partitions[0].Range.IsMinInclusive {
t.Errorf("first partition should start at 1000 (inclusive), got %d (inclusive=%v)",
partitions[0].Range.Min, partitions[0].Range.IsMinInclusive)
}
if partitions[len(partitions)-1].Range.Max != 2000 {
t.Errorf("last partition should end at 2000, got %d", partitions[len(partitions)-1].Range.Max)
}
}
func TestCalculatePartitionsEstimation_FirstPartitionInclusive(t *testing.T) {
ctx := context.Background()
mock := &MockTableAnalyzer{minValue: 50, maxValue: 70}
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 3)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !partitions[0].Range.IsMinInclusive {
t.Errorf("first partition should have IsMinInclusive=true")
}
if partitions[0].Range.Min != 50 {
t.Errorf("first partition should start at 50, got %d", partitions[0].Range.Min)
}
for i := 1; i < len(partitions); i++ {
if partitions[i].Range.IsMinInclusive {
t.Errorf("partition %d should have IsMinInclusive=false to avoid overlap", i)
}
}
}

View File

@@ -196,6 +196,29 @@ GROUP BY t.name`
return rowsCount, nil return rowsCount, nil
} }
func (ta *MssqlTableAnalyzer) QueryMaxMinFromColumn(
ctx context.Context,
tableInfo config.TableInfo,
columnName string,
) (etl.MaxMinColumnResult, error) {
query := fmt.Sprintf(`
SELECT
MIN([%s]) AS min_value,
MAX([%s]) AS max_value
FROM [%s].[%s]`, columnName, columnName, tableInfo.Schema, tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
result := etl.MaxMinColumnResult{}
err := ta.db.QueryRow(ctxTimeout, query).Scan(&result.Min, &result.Max)
if err != nil {
return etl.MaxMinColumnResult{}, err
}
return result, nil
}
func (ta *MssqlTableAnalyzer) CalculatePartitionRanges( func (ta *MssqlTableAnalyzer) CalculatePartitionRanges(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,

View File

@@ -164,6 +164,14 @@ func (ta *PostgresTableAnalyzer) EstimateTotalRows(
return 0, nil return 0, nil
} }
func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn(
ctx context.Context,
tableInfo config.TableInfo,
columnName string,
) (etl.MaxMinColumnResult, error) {
return etl.MaxMinColumnResult{}, nil
}
func (ta *PostgresTableAnalyzer) CalculatePartitionRanges( func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,

View File

@@ -29,6 +29,11 @@ type Transformer interface {
) )
} }
type MaxMinColumnResult struct {
Max int64
Min int64
}
type TableAnalyzer interface { type TableAnalyzer interface {
QueryColumnTypes( QueryColumnTypes(
ctx context.Context, ctx context.Context,
@@ -40,6 +45,12 @@ type TableAnalyzer interface {
tableInfo config.TableInfo, tableInfo config.TableInfo,
) (int64, error) ) (int64, error)
QueryMaxMinFromColumn(
ctx context.Context,
tableInfo config.TableInfo,
columnName string,
) (MaxMinColumnResult, error)
CalculatePartitionRanges( CalculatePartitionRanges(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,