Compare commits
4 Commits
604702ef43
...
b98c998820
| Author | SHA1 | Date | |
|---|---|---|---|
|
b98c998820
|
|||
|
fe35d2a34c
|
|||
|
0784458106
|
|||
|
6f2e3e28f1
|
@@ -71,6 +71,7 @@ jobs:
|
||||
- source: DATA
|
||||
target: FILE_URL
|
||||
mode: REFERENCE_ONLY
|
||||
prefix: Infraestructura/SITE_HOLDER__ATTACH
|
||||
batches_per_partition: 20
|
||||
max_extractors: 32
|
||||
extractor_batch_size: 1
|
||||
|
||||
@@ -4,10 +4,13 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -72,16 +75,15 @@ func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []
|
||||
return "", ErrInvalidInput
|
||||
}
|
||||
|
||||
fullPath := blobPath
|
||||
if c.azureStorageConfig.Prefix != "" {
|
||||
fullPath, _ = url.JoinPath(c.azureStorageConfig.Prefix, blobPath)
|
||||
fullPath := path.Join(c.azureStorageConfig.Prefix, blobPath)
|
||||
|
||||
contentType := http.DetectContentType(buffer)
|
||||
opts := &azblob.UploadBufferOptions{
|
||||
HTTPHeaders: &blob.HTTPHeaders{BlobContentType: &contentType},
|
||||
}
|
||||
if _, err := c.client.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer, opts); err != nil {
|
||||
return "", fmt.Errorf("uploading blob %s: %w", fullPath, err)
|
||||
}
|
||||
|
||||
if err := c.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
blobEndpoint, _ := url.JoinPath(c.azureStorageConfig.ServiceURL, c.azureStorageConfig.AccountName)
|
||||
blobURL, _ := url.JoinPath(blobEndpoint, c.azureStorageConfig.Container, fullPath)
|
||||
return blobURL, nil
|
||||
return fullPath, nil
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ type ToStorageColumnConfig struct {
|
||||
Source string `yaml:"source"`
|
||||
Target string `yaml:"target"`
|
||||
Mode string `yaml:"mode"`
|
||||
Prefix string `yaml:"prefix"`
|
||||
}
|
||||
|
||||
type ToStorageConfig struct {
|
||||
@@ -67,10 +68,10 @@ type TargetTableInfo struct {
|
||||
}
|
||||
|
||||
type RangeConfig struct {
|
||||
Min int64 `yaml:"min"`
|
||||
Max int64 `yaml:"max"`
|
||||
IsMinInclusive bool `yaml:"is_min_inclusive"`
|
||||
IsMaxInclusive bool `yaml:"is_max_inclusive"`
|
||||
Min *int64 `yaml:"min"`
|
||||
Max *int64 `yaml:"max"`
|
||||
IsMinInclusive bool `yaml:"is_min_inclusive"`
|
||||
IsMaxInclusive bool `yaml:"is_max_inclusive"`
|
||||
}
|
||||
|
||||
type Job struct {
|
||||
|
||||
@@ -76,6 +76,7 @@ func (ex *GenericExtractor) ProcessPartition(
|
||||
|
||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
||||
var rowsRead int64 = 0
|
||||
var lastRow models.UnknownRowValues
|
||||
|
||||
for rows.Next() {
|
||||
rowValues := make([]any, len(columns))
|
||||
@@ -98,6 +99,7 @@ func (ex *GenericExtractor) ProcessPartition(
|
||||
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
rowsRead++
|
||||
lastRow = rowValues
|
||||
|
||||
batchRows = append(batchRows, rowValues)
|
||||
if len(batchRows) >= batchSize {
|
||||
@@ -114,5 +116,12 @@ func (ex *GenericExtractor) ProcessPartition(
|
||||
return rowsRead, err
|
||||
}
|
||||
|
||||
return rowsRead, rows.Err()
|
||||
if err := rows.Err(); err != nil {
|
||||
if lastRow != nil {
|
||||
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
return rowsRead, err
|
||||
}
|
||||
|
||||
return rowsRead, nil
|
||||
}
|
||||
|
||||
@@ -20,20 +20,6 @@ func PartitionRangeGenerator(
|
||||
rowsPerPartition int64,
|
||||
jobRange config.RangeConfig,
|
||||
) ([]models.Partition, error) {
|
||||
if jobRange.Min > 0 {
|
||||
return []models.Partition{{
|
||||
Id: uuid.New(),
|
||||
HasRange: true,
|
||||
RetryCounter: 0,
|
||||
Range: models.PartitionRange{
|
||||
Min: jobRange.Min,
|
||||
Max: jobRange.Max,
|
||||
IsMinInclusive: jobRange.IsMinInclusive,
|
||||
IsMaxInclusive: jobRange.IsMaxInclusive,
|
||||
},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
|
||||
logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table)
|
||||
if err != nil {
|
||||
@@ -41,21 +27,33 @@ func PartitionRangeGenerator(
|
||||
}
|
||||
|
||||
if rowsCount <= rowsPerPartition {
|
||||
return []models.Partition{{
|
||||
Id: uuid.New(),
|
||||
HasRange: false,
|
||||
RetryCounter: 0,
|
||||
}}, nil
|
||||
|
||||
hasRange := jobRange.Min != nil || jobRange.Max != nil
|
||||
partition := models.Partition{Id: uuid.New(), HasRange: hasRange, RetryCounter: 0}
|
||||
if hasRange {
|
||||
var min, max int64
|
||||
if jobRange.Min != nil {
|
||||
min = *jobRange.Min
|
||||
}
|
||||
if jobRange.Max != nil {
|
||||
max = *jobRange.Max
|
||||
}
|
||||
partition.Range = models.PartitionRange{
|
||||
Min: min,
|
||||
Max: max,
|
||||
IsMinInclusive: jobRange.IsMinInclusive,
|
||||
IsMaxInclusive: jobRange.IsMaxInclusive,
|
||||
}
|
||||
}
|
||||
return []models.Partition{partition}, nil
|
||||
}
|
||||
|
||||
partitionsCount := rowsCount / rowsPerPartition
|
||||
|
||||
if partitionCalculationStrategy == "ESTIMATION" {
|
||||
return calculatePartitionsEstimation(ctx, tableAnalyzer, tableInfo, partitionColumn, partitionsCount)
|
||||
return calculatePartitionsEstimation(ctx, tableAnalyzer, tableInfo, partitionColumn, partitionsCount, jobRange)
|
||||
}
|
||||
|
||||
partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount)
|
||||
partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount, jobRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -71,16 +69,37 @@ func calculatePartitionsEstimation(
|
||||
tableInfo config.TableInfo,
|
||||
partitionColumn string,
|
||||
partitionsCount int64,
|
||||
rangeConstraint config.RangeConfig,
|
||||
) ([]models.Partition, error) {
|
||||
result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var minValue, maxValue int64
|
||||
|
||||
if rangeConstraint.Min != nil && rangeConstraint.Max != nil {
|
||||
minValue = *rangeConstraint.Min
|
||||
maxValue = *rangeConstraint.Max
|
||||
logrus.Infof("Column range for %s.%s.%s: [%d, %d] (user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue)
|
||||
} else if rangeConstraint.Min != nil || rangeConstraint.Max != nil {
|
||||
result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rangeConstraint.Min != nil {
|
||||
minValue = *rangeConstraint.Min
|
||||
maxValue = result.Max
|
||||
logrus.Infof("Column range for %s.%s.%s: [%d, %d] (min user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue)
|
||||
} else {
|
||||
minValue = result.Min
|
||||
maxValue = *rangeConstraint.Max
|
||||
logrus.Infof("Column range for %s.%s.%s: [%d, %d] (max user-defined)", tableInfo.Schema, tableInfo.Table, partitionColumn, minValue, maxValue)
|
||||
}
|
||||
} else {
|
||||
result, err := tableAnalyzer.QueryMaxMinFromColumn(ctx, tableInfo, partitionColumn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logrus.Infof("Column range for %s.%s.%s: [%d, %d]", tableInfo.Schema, tableInfo.Table, partitionColumn, result.Min, result.Max)
|
||||
minValue = result.Min
|
||||
maxValue = result.Max
|
||||
}
|
||||
|
||||
logrus.Infof("Column range for %s.%s.%s: [%d, %d]", tableInfo.Schema, tableInfo.Table, partitionColumn, result.Min, result.Max)
|
||||
|
||||
minValue := result.Min
|
||||
maxValue := result.Max
|
||||
rangeSize := maxValue - minValue
|
||||
stepSize := int64(math.Ceil(float64(rangeSize) / float64(partitionsCount)))
|
||||
|
||||
@@ -94,9 +113,7 @@ func calculatePartitionsEstimation(
|
||||
partitionMax = maxValue
|
||||
}
|
||||
|
||||
// Only the first partition has IsMinInclusive=true to avoid overlap
|
||||
isMinInclusive := i == 0
|
||||
|
||||
partition := models.Partition{
|
||||
Id: uuid.New(),
|
||||
HasRange: true,
|
||||
|
||||
@@ -10,32 +10,39 @@ import (
|
||||
)
|
||||
|
||||
type MockTableAnalyzer struct {
|
||||
minValue int64
|
||||
maxValue int64
|
||||
minValue int64
|
||||
maxValue int64
|
||||
totalRows int64
|
||||
capturedRangeConstraint config.RangeConfig
|
||||
}
|
||||
|
||||
func (m *MockTableAnalyzer) QueryColumnTypes(ctx context.Context, tableInfo config.TableInfo) ([]models.ColumnType, error) {
|
||||
func (m *MockTableAnalyzer) QueryColumnTypes(_ context.Context, _ config.TableInfo) ([]models.ColumnType, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *MockTableAnalyzer) EstimateTotalRows(ctx context.Context, tableInfo config.TableInfo) (int64, error) {
|
||||
return 0, nil
|
||||
func (m *MockTableAnalyzer) EstimateTotalRows(_ context.Context, _ config.TableInfo) (int64, error) {
|
||||
return m.totalRows, nil
|
||||
}
|
||||
|
||||
func (m *MockTableAnalyzer) QueryMaxMinFromColumn(ctx context.Context, tableInfo config.TableInfo, columnName string) (etl.MaxMinColumnResult, error) {
|
||||
func (m *MockTableAnalyzer) QueryMaxMinFromColumn(_ context.Context, _ config.TableInfo, _ string) (etl.MaxMinColumnResult, error) {
|
||||
return etl.MaxMinColumnResult{Min: m.minValue, Max: m.maxValue}, nil
|
||||
}
|
||||
|
||||
func (m *MockTableAnalyzer) CalculatePartitionRanges(ctx context.Context, tableInfo config.TableInfo, partitionColumn string, maxPartitions int64) ([]models.Partition, error) {
|
||||
return nil, nil
|
||||
func (m *MockTableAnalyzer) CalculatePartitionRanges(_ context.Context, _ config.TableInfo, _ string, _ int64, rangeConstraint config.RangeConfig) ([]models.Partition, error) {
|
||||
m.capturedRangeConstraint = rangeConstraint
|
||||
return []models.Partition{}, nil
|
||||
}
|
||||
|
||||
//go:fix inline
|
||||
func ptr64(v int64) *int64 { return new(v) }
|
||||
|
||||
var testTableInfo = config.TableInfo{Schema: "dbo", Table: "test"}
|
||||
|
||||
func TestCalculatePartitionsEstimation_NoOverlap(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{minValue: 0, maxValue: 100}
|
||||
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
|
||||
|
||||
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 4)
|
||||
partitions, err := calculatePartitionsEstimation(ctx, mock, testTableInfo, "id", 4, config.RangeConfig{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -47,26 +54,17 @@ func TestCalculatePartitionsEstimation_NoOverlap(t *testing.T) {
|
||||
for i := 0; i < len(partitions)-1; i++ {
|
||||
current := partitions[i].Range
|
||||
next := partitions[i+1].Range
|
||||
|
||||
if current.Max == next.Min {
|
||||
if current.IsMaxInclusive && next.IsMinInclusive {
|
||||
t.Errorf("partition %d and %d overlap at value %d (both inclusive)", i, i+1, current.Max)
|
||||
}
|
||||
if current.Max == next.Min && current.IsMaxInclusive && next.IsMinInclusive {
|
||||
t.Errorf("partition %d and %d overlap at value %d (both inclusive)", i, i+1, current.Max)
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Partitions generated:")
|
||||
for i, p := range partitions {
|
||||
t.Logf(" P%d: [%d, %d] (minInc=%v, maxInc=%v)", i, p.Range.Min, p.Range.Max, p.Range.IsMinInclusive, p.Range.IsMaxInclusive)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCalculatePartitionsEstimation_CoverageComplete(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{minValue: 1000, maxValue: 2000}
|
||||
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
|
||||
|
||||
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 5)
|
||||
partitions, err := calculatePartitionsEstimation(ctx, mock, testTableInfo, "id", 5, config.RangeConfig{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -84,9 +82,8 @@ func TestCalculatePartitionsEstimation_CoverageComplete(t *testing.T) {
|
||||
func TestCalculatePartitionsEstimation_FirstPartitionInclusive(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{minValue: 50, maxValue: 70}
|
||||
tableInfo := config.TableInfo{Schema: "dbo", Table: "test"}
|
||||
|
||||
partitions, err := calculatePartitionsEstimation(ctx, mock, tableInfo, "id", 3)
|
||||
partitions, err := calculatePartitionsEstimation(ctx, mock, testTableInfo, "id", 3, config.RangeConfig{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -105,3 +102,231 @@ func TestCalculatePartitionsEstimation_FirstPartitionInclusive(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_Exact_NoRange_PassesEmptyConstraint(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 1000}
|
||||
|
||||
_, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "EXACT", 100, config.RangeConfig{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if mock.capturedRangeConstraint.Min != nil || mock.capturedRangeConstraint.Max != nil {
|
||||
t.Errorf("expected empty range constraint, got min=%v max=%v",
|
||||
mock.capturedRangeConstraint.Min, mock.capturedRangeConstraint.Max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_Exact_BothBounds_PassesBothToAnalyzer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 1000}
|
||||
jobRange := config.RangeConfig{Min: ptr64(200), Max: ptr64(800), IsMinInclusive: true, IsMaxInclusive: true}
|
||||
|
||||
_, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "EXACT", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
rc := mock.capturedRangeConstraint
|
||||
if rc.Min == nil || *rc.Min != 200 {
|
||||
t.Errorf("expected Min=200, got %v", rc.Min)
|
||||
}
|
||||
if rc.Max == nil || *rc.Max != 800 {
|
||||
t.Errorf("expected Max=800, got %v", rc.Max)
|
||||
}
|
||||
if !rc.IsMinInclusive || !rc.IsMaxInclusive {
|
||||
t.Errorf("expected both bounds inclusive, got minInc=%v maxInc=%v", rc.IsMinInclusive, rc.IsMaxInclusive)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_Exact_MinOnly_PassesMinNilMax(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 1000}
|
||||
jobRange := config.RangeConfig{Min: ptr64(500)}
|
||||
|
||||
_, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "EXACT", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
rc := mock.capturedRangeConstraint
|
||||
if rc.Min == nil || *rc.Min != 500 {
|
||||
t.Errorf("expected Min=500, got %v", rc.Min)
|
||||
}
|
||||
if rc.Max != nil {
|
||||
t.Errorf("expected Max=nil (no upper bound), got %v", rc.Max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_Exact_MaxOnly_PassesMaxNilMin(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 1000}
|
||||
jobRange := config.RangeConfig{Max: ptr64(300)}
|
||||
|
||||
_, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "EXACT", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
rc := mock.capturedRangeConstraint
|
||||
if rc.Min != nil {
|
||||
t.Errorf("expected Min=nil (no lower bound), got %v", rc.Min)
|
||||
}
|
||||
if rc.Max == nil || *rc.Max != 300 {
|
||||
t.Errorf("expected Max=300, got %v", rc.Max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_Estimation_BothBounds_UsesUserRange(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
// DB min/max differ intentionally — user bounds should take precedence.
|
||||
mock := &MockTableAnalyzer{totalRows: 1000, minValue: 0, maxValue: 999}
|
||||
jobRange := config.RangeConfig{Min: ptr64(200), Max: ptr64(700), IsMinInclusive: true}
|
||||
|
||||
partitions, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "ESTIMATION", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(partitions) == 0 {
|
||||
t.Fatal("expected at least one partition")
|
||||
}
|
||||
|
||||
if partitions[0].Range.Min != 200 {
|
||||
t.Errorf("first partition should start at user min=200, got %d", partitions[0].Range.Min)
|
||||
}
|
||||
if partitions[len(partitions)-1].Range.Max != 700 {
|
||||
t.Errorf("last partition should end at user max=700, got %d", partitions[len(partitions)-1].Range.Max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_Estimation_MinOnly_QueriesDBForMax(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 1000, minValue: 0, maxValue: 999}
|
||||
jobRange := config.RangeConfig{Min: ptr64(500), IsMinInclusive: true}
|
||||
|
||||
partitions, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "ESTIMATION", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(partitions) == 0 {
|
||||
t.Fatal("expected at least one partition")
|
||||
}
|
||||
|
||||
if partitions[0].Range.Min != 500 {
|
||||
t.Errorf("first partition should start at user min=500, got %d", partitions[0].Range.Min)
|
||||
}
|
||||
if partitions[len(partitions)-1].Range.Max != 999 {
|
||||
t.Errorf("last partition should end at DB max=999, got %d", partitions[len(partitions)-1].Range.Max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_Estimation_MaxOnly_QueriesDBForMin(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 1000, minValue: 100, maxValue: 999}
|
||||
jobRange := config.RangeConfig{Max: ptr64(600), IsMaxInclusive: true}
|
||||
|
||||
partitions, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "ESTIMATION", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(partitions) == 0 {
|
||||
t.Fatal("expected at least one partition")
|
||||
}
|
||||
|
||||
if partitions[0].Range.Min != 100 {
|
||||
t.Errorf("first partition should start at DB min=100, got %d", partitions[0].Range.Min)
|
||||
}
|
||||
if partitions[len(partitions)-1].Range.Max != 600 {
|
||||
t.Errorf("last partition should end at user max=600, got %d", partitions[len(partitions)-1].Range.Max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_SinglePartition_NoRange(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 50}
|
||||
|
||||
partitions, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "EXACT", 100, config.RangeConfig{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(partitions) != 1 {
|
||||
t.Fatalf("expected 1 partition, got %d", len(partitions))
|
||||
}
|
||||
if partitions[0].HasRange {
|
||||
t.Error("single partition with no range should have HasRange=false")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_SinglePartition_BothBounds(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 50}
|
||||
jobRange := config.RangeConfig{Min: ptr64(100), Max: ptr64(200), IsMinInclusive: true, IsMaxInclusive: true}
|
||||
|
||||
partitions, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "EXACT", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(partitions) != 1 {
|
||||
t.Fatalf("expected 1 partition, got %d", len(partitions))
|
||||
}
|
||||
p := partitions[0]
|
||||
if !p.HasRange {
|
||||
t.Error("expected HasRange=true")
|
||||
}
|
||||
if p.Range.Min != 100 || p.Range.Max != 200 {
|
||||
t.Errorf("expected [100, 200], got [%d, %d]", p.Range.Min, p.Range.Max)
|
||||
}
|
||||
if !p.Range.IsMinInclusive || !p.Range.IsMaxInclusive {
|
||||
t.Errorf("expected both inclusive, got minInc=%v maxInc=%v", p.Range.IsMinInclusive, p.Range.IsMaxInclusive)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_SinglePartition_MinOnly(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 50}
|
||||
jobRange := config.RangeConfig{Min: ptr64(100), IsMinInclusive: true}
|
||||
|
||||
partitions, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "EXACT", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
p := partitions[0]
|
||||
if !p.HasRange {
|
||||
t.Error("expected HasRange=true")
|
||||
}
|
||||
if p.Range.Min != 100 {
|
||||
t.Errorf("expected Min=100, got %d", p.Range.Min)
|
||||
}
|
||||
if p.Range.Max != 0 {
|
||||
t.Errorf("expected Max=0 (no upper bound), got %d", p.Range.Max)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartitionRangeGenerator_SinglePartition_MaxOnly(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mock := &MockTableAnalyzer{totalRows: 50}
|
||||
jobRange := config.RangeConfig{Max: ptr64(200), IsMaxInclusive: true}
|
||||
|
||||
partitions, err := PartitionRangeGenerator(ctx, mock, testTableInfo, "id", "EXACT", 100, jobRange)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
p := partitions[0]
|
||||
if !p.HasRange {
|
||||
t.Error("expected HasRange=true")
|
||||
}
|
||||
if p.Range.Min != 0 {
|
||||
t.Errorf("expected Min=0 (no lower bound), got %d", p.Range.Min)
|
||||
}
|
||||
if p.Range.Max != 200 {
|
||||
t.Errorf("expected Max=200, got %d", p.Range.Max)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,12 +224,37 @@ func (ta *MssqlTableAnalyzer) CalculatePartitionRanges(
|
||||
tableInfo config.TableInfo,
|
||||
partitionColumn string,
|
||||
maxPartitions int64,
|
||||
rangeConstraint config.RangeConfig,
|
||||
) ([]models.Partition, error) {
|
||||
whereClause := ""
|
||||
args := []any{sql.Named("maxPartitions", maxPartitions)}
|
||||
|
||||
if rangeConstraint.Min != nil || rangeConstraint.Max != nil {
|
||||
var conditions []string
|
||||
if rangeConstraint.Min != nil {
|
||||
minOp := ">"
|
||||
if rangeConstraint.IsMinInclusive {
|
||||
minOp = ">="
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("[%s] %s @rangeMin", partitionColumn, minOp))
|
||||
args = append(args, sql.Named("rangeMin", *rangeConstraint.Min))
|
||||
}
|
||||
if rangeConstraint.Max != nil {
|
||||
maxOp := "<"
|
||||
if rangeConstraint.IsMaxInclusive {
|
||||
maxOp = "<="
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("[%s] %s @rangeMax", partitionColumn, maxOp))
|
||||
args = append(args, sql.Named("rangeMax", *rangeConstraint.Max))
|
||||
}
|
||||
whereClause = "WHERE " + strings.Join(conditions, " AND ")
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
MIN([%s]) AS lower_limit,
|
||||
MAX([%s]) AS upper_limit
|
||||
FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T
|
||||
FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s] %s) AS T
|
||||
GROUP BY batch_id
|
||||
ORDER BY batch_id`,
|
||||
partitionColumn,
|
||||
@@ -237,12 +262,13 @@ ORDER BY batch_id`,
|
||||
partitionColumn,
|
||||
partitionColumn,
|
||||
tableInfo.Schema,
|
||||
tableInfo.Table)
|
||||
tableInfo.Table,
|
||||
whereClause)
|
||||
|
||||
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
|
||||
rows, err := ta.db.Query(ctxTimeout, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -177,6 +177,7 @@ func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
|
||||
tableInfo config.TableInfo,
|
||||
partitionColumn string,
|
||||
maxPartitions int64,
|
||||
rangeConstraint config.RangeConfig,
|
||||
) ([]models.Partition, error) {
|
||||
return []models.Partition{}, nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package transformers
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -99,12 +100,11 @@ func computeStorageTransformationPlan(
|
||||
}
|
||||
b, ok := v.([]byte)
|
||||
if !ok {
|
||||
logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
|
||||
schema, table, sourceColName, v)
|
||||
logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through", schema, table, sourceColName, v)
|
||||
return v, nil
|
||||
}
|
||||
// start := time.Now()
|
||||
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
|
||||
blobPath := path.Join(storageCol.Prefix, uuid.New().String())
|
||||
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
|
||||
if err != nil {
|
||||
return nil, &custom_errors.JobError{
|
||||
|
||||
@@ -56,5 +56,6 @@ type TableAnalyzer interface {
|
||||
tableInfo config.TableInfo,
|
||||
partitionColumn string,
|
||||
maxPartitions int64,
|
||||
rangeConstraint config.RangeConfig,
|
||||
) ([]models.Partition, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user