diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index efd4a9c..13c9275 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -104,6 +104,7 @@ func processMigrationJob( sourceTableAnalyzer, job.SourceTable.TableInfo, job.SourceTable.PrimaryKey, + job.PartitionCalculationStrategy, job.RowsPerPartition, job.Range, ) diff --git a/config.yaml b/config.yaml index aa9ac6f..cf52338 100644 --- a/config.yaml +++ b/config.yaml @@ -12,6 +12,7 @@ defaults: transformer_queue_size: 8 max_loaders: 4 loader_batch_size: 25000 + partition_calculation_strategy: EXACT # EXACT | ESTIMATION truncate_target: true truncate_method: TRUNCATE # TRUNCATE | DELETE retry: diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index 39309f7..bb90f36 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -27,20 +27,21 @@ type ToStorageConfig struct { } type JobConfig struct { - BatchesPerPartition int `yaml:"batches_per_partition"` - MaxExtractors int `yaml:"max_extractors"` - ExtractorBatchSize int `yaml:"extractor_batch_size"` - ExtractorQueueSize int `yaml:"extractor_queue_size"` - MaxTransformers int `yaml:"max_transformers"` - TransformerBatchSize int `yaml:"transformer_batch_size"` - TransformerQueueSize int `yaml:"transformer_queue_size"` - MaxLoaders int `yaml:"max_loaders"` - LoaderBatchSize int `yaml:"loader_batch_size"` - TruncateTarget bool `yaml:"truncate_target"` - TruncateMethod string `yaml:"truncate_method"` - Retry RetryConfig `yaml:"retry"` - RowsPerPartition int64 - ToStorage ToStorageConfig `yaml:"to_storage"` + BatchesPerPartition int `yaml:"batches_per_partition"` + MaxExtractors int `yaml:"max_extractors"` + ExtractorBatchSize int `yaml:"extractor_batch_size"` + ExtractorQueueSize int `yaml:"extractor_queue_size"` + MaxTransformers int `yaml:"max_transformers"` + TransformerBatchSize int `yaml:"transformer_batch_size"` + TransformerQueueSize int `yaml:"transformer_queue_size"` + MaxLoaders int `yaml:"max_loaders"` + LoaderBatchSize int `yaml:"loader_batch_size"` + PartitionCalculationStrategy string `yaml:"partition_calculation_strategy"` + TruncateTarget bool `yaml:"truncate_target"` + TruncateMethod string `yaml:"truncate_method"` + Retry RetryConfig `yaml:"retry"` + RowsPerPartition int64 + ToStorage ToStorageConfig `yaml:"to_storage"` } type FromJsonItem struct { diff --git a/internal/app/etl/table_analyzers/main.go b/internal/app/etl/table_analyzers/main.go index 2949c46..10c7f1a 100644 --- a/internal/app/etl/table_analyzers/main.go +++ b/internal/app/etl/table_analyzers/main.go @@ -2,6 +2,7 @@ package table_analyzers import ( "context" + "math" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" @@ -15,6 +16,7 @@ func PartitionRangeGenerator( tableAnalyzer etl.TableAnalyzer, tableInfo config.TableInfo, partitionColumn string, + partitionCalculationStrategy string, rowsPerPartition int64, jobRange config.RangeConfig, ) ([]models.Partition, error) { @@ -48,12 +50,67 @@ func PartitionRangeGenerator( } partitionsCount := rowsCount / rowsPerPartition + + if partitionCalculationStrategy == "ESTIMATION" { + return calculatePartitionsEstimation(ctx, tableAnalyzer, tableInfo, partitionColumn, partitionsCount) + } + partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount) if err != nil { return nil, err } - // logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table) + logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table) + + return partitions, nil +} + +func calculatePartitionsEstimation( + ctx context.Context, + tableAnalyzer etl.TableAnalyzer, + tableInfo config.TableInfo, + partitionColumn string, + partitionsCount int64, +) ([]models.Partition, error) { + result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn) + if err != nil { + return nil, err + } + + logrus.Infof("Column range for %s.%s.%s: [%d, %d]", tableInfo.Schema, tableInfo.Table, partitionColumn, result.Min, result.Max) + + minValue := result.Min + maxValue := result.Max + rangeSize := maxValue - minValue + stepSize := int64(math.Ceil(float64(rangeSize) / float64(partitionsCount))) + + partitions := make([]models.Partition, 0, partitionsCount) + + for i := range partitionsCount { + partitionMin := minValue + (i * stepSize) + partitionMax := minValue + ((i + 1) * stepSize) + + if i == partitionsCount-1 { + partitionMax = maxValue + } + + // Only the first partition has IsMinInclusive=true to avoid overlap + isMinInclusive := i == 0 + + partition := models.Partition{ + Id: uuid.New(), + HasRange: true, + RetryCounter: 0, + Range: models.PartitionRange{ + Min: partitionMin, + Max: partitionMax, + IsMinInclusive: isMinInclusive, + IsMaxInclusive: true, + }, + } + + partitions = append(partitions, partition) + } return partitions, nil } diff --git a/internal/app/etl/table_analyzers/main_test.go b/internal/app/etl/table_analyzers/main_test.go new file mode 100644 index 0000000..a882a7f --- /dev/null +++ b/internal/app/etl/table_analyzers/main_test.go @@ -0,0 +1,107 @@ +package table_analyzers + +import ( + "context" + "testing" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +type MockTableAnalyzer struct { + minValue int64 + maxValue int64 +} + +func (m *MockTableAnalyzer) QueryColumnTypes(ctx context.Context, tableInfo config.TableInfo) ([]models.ColumnType, error) { + return nil, nil +} + +func (m *MockTableAnalyzer) EstimateTotalRows(ctx context.Context, tableInfo config.TableInfo) (int64, error) { + return 0, nil +} + +func (m *MockTableAnalyzer) QueryMaxMinFromColumn(ctx context.Context, tableInfo config.TableInfo, columnName string) (etl.MaxMinColumnResult, error) { + return etl.MaxMinColumnResult{Min: m.minValue, Max: m.maxValue}, nil +} + +func (m *MockTableAnalyzer) CalculatePartitionRanges(ctx context.Context, tableInfo config.TableInfo, partitionColumn string, maxPartitions int64) ([]models.Partition, error) { + return nil, nil +} + +func TestCalculatePartitionsEstimation_NoOverlap(t *testing.T) { + ctx := context.Background() + mock := &MockTableAnalyzer{minValue: 0, maxValue: 100} + tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} + + partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 4) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(partitions) != 4 { + t.Errorf("expected 4 partitions, got %d", len(partitions)) + } + + for i := 0; i < len(partitions)-1; i++ { + current := partitions[i].Range + next := partitions[i+1].Range + + if current.Max == next.Min { + if current.IsMaxInclusive && next.IsMinInclusive { + t.Errorf("partition %d and %d overlap at value %d (both inclusive)", i, i+1, current.Max) + } + } + } + + t.Logf("Partitions generated:") + for i, p := range partitions { + t.Logf(" P%d: [%d, %d] (minInc=%v, maxInc=%v)", i, p.Range.Min, p.Range.Max, p.Range.IsMinInclusive, p.Range.IsMaxInclusive) + } +} + +func TestCalculatePartitionsEstimation_CoverageComplete(t *testing.T) { + ctx := context.Background() + mock := &MockTableAnalyzer{minValue: 1000, maxValue: 2000} + tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} + + partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if partitions[0].Range.Min != 1000 || !partitions[0].Range.IsMinInclusive { + t.Errorf("first partition should start at 1000 (inclusive), got %d (inclusive=%v)", + partitions[0].Range.Min, partitions[0].Range.IsMinInclusive) + } + + if partitions[len(partitions)-1].Range.Max != 2000 { + t.Errorf("last partition should end at 2000, got %d", partitions[len(partitions)-1].Range.Max) + } +} + +func TestCalculatePartitionsEstimation_FirstPartitionInclusive(t *testing.T) { + ctx := context.Background() + mock := &MockTableAnalyzer{minValue: 50, maxValue: 70} + tableInfo := config.TableInfo{Schema: "dbo", Table: "test"} + + partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 3) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !partitions[0].Range.IsMinInclusive { + t.Errorf("first partition should have IsMinInclusive=true") + } + + if partitions[0].Range.Min != 50 { + t.Errorf("first partition should start at 50, got %d", partitions[0].Range.Min) + } + + for i := 1; i < len(partitions); i++ { + if partitions[i].Range.IsMinInclusive { + t.Errorf("partition %d should have IsMinInclusive=false to avoid overlap", i) + } + } +} diff --git a/internal/app/etl/table_analyzers/mssql.go b/internal/app/etl/table_analyzers/mssql.go index 39f9daf..3956c48 100644 --- a/internal/app/etl/table_analyzers/mssql.go +++ b/internal/app/etl/table_analyzers/mssql.go @@ -196,6 +196,29 @@ GROUP BY t.name` return rowsCount, nil } +func (ta *MssqlTableAnalyzer) QueryMaxMinFromColumn( + ctx context.Context, + tableInfo config.TableInfo, + columnName string, +) (etl.MaxMinColumnResult, error) { + query := fmt.Sprintf(` +SELECT + MIN([%s]) AS min_value, + MAX([%s]) AS max_value +FROM [%s].[%s]`, columnName, columnName, tableInfo.Schema, tableInfo.Table) + + ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + + result := etl.MaxMinColumnResult{} + err := ta.db.QueryRow(ctxTimeout, query).Scan(&result.Min, &result.Max) + if err != nil { + return etl.MaxMinColumnResult{}, err + } + + return result, nil +} + func (ta *MssqlTableAnalyzer) CalculatePartitionRanges( ctx context.Context, tableInfo config.TableInfo, diff --git a/internal/app/etl/table_analyzers/postgres.go b/internal/app/etl/table_analyzers/postgres.go index 8aac15d..f5345ce 100644 --- a/internal/app/etl/table_analyzers/postgres.go +++ b/internal/app/etl/table_analyzers/postgres.go @@ -164,6 +164,14 @@ func (ta *PostgresTableAnalyzer) EstimateTotalRows( return 0, nil } +func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn( + ctx context.Context, + tableInfo config.TableInfo, + columnName string, +) (etl.MaxMinColumnResult, error) { + return etl.MaxMinColumnResult{}, nil +} + func (ta *PostgresTableAnalyzer) CalculatePartitionRanges( ctx context.Context, tableInfo config.TableInfo, diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index a1970ae..e489366 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -29,6 +29,11 @@ type Transformer interface { ) } +type MaxMinColumnResult struct { + Max int64 + Min int64 +} + type TableAnalyzer interface { QueryColumnTypes( ctx context.Context, @@ -40,6 +45,12 @@ type TableAnalyzer interface { tableInfo config.TableInfo, ) (int64, error) + QueryMaxMinFromColumn( + ctx context.Context, + tableInfo config.TableInfo, + columnName string, + ) (MaxMinColumnResult, error) + CalculatePartitionRanges( ctx context.Context, tableInfo config.TableInfo,