2 Commits

8 changed files with 41 additions and 23 deletions

View File

@@ -30,6 +30,11 @@ jobs:
table: MANZANA table: MANZANA
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto - name: red_puerto
enabled: true enabled: true

View File

@@ -50,6 +50,12 @@ type Job struct {
PreSQL []string `yaml:"pre_sql"` PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"` PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"` JobConfig `yaml:",inline"`
Range struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
} }
type MigrationConfig struct { type MigrationConfig struct {

View File

@@ -103,8 +103,8 @@ func ExtractorErrorHandler(
if err.HasLastId { if err.HasLastId {
newPartition.ParentId = err.Partition.Id newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New() newPartition.Id = uuid.New()
newPartition.LowerLimit = err.LastId newPartition.Range.Min = err.LastId
newPartition.IsLowerLimitInclusive = false newPartition.Range.IsMinInclusive = false
} }
requeueWithBackoff(ctx, delay, func() { requeueWithBackoff(ctx, delay, func() {

View File

@@ -107,13 +107,13 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int, error) { ) (int, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive) query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
var queryArgs []any var queryArgs []any
if partition.ShouldUseRange { if partition.HasRange {
queryArgs = append(queryArgs, queryArgs = append(queryArgs,
sql.Named("min", partition.LowerLimit), sql.Named("min", partition.Range.Min),
sql.Named("max", partition.UpperLimit), sql.Named("max", partition.Range.Max),
) )
} }

View File

@@ -62,7 +62,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
) (int, error) { ) (int, error) {
query := buildExtractQueryPostgres(tableInfo, columns) query := buildExtractQueryPostgres(tableInfo, columns)
if partition.ShouldUseRange { if partition.HasRange {
return 0, errors.New("Batch config not yet supported") return 0, errors.New("Batch config not yet supported")
} }

View File

@@ -23,9 +23,9 @@ func PartitionRangeGenerator(
if rowsCount <= rowsPerPartition { if rowsCount <= rowsPerPartition {
return []models.Partition{{ return []models.Partition{{
Id: uuid.New(), Id: uuid.New(),
ShouldUseRange: false, HasRange: false,
RetryCounter: 0, RetryCounter: 0,
}}, nil }}, nil
} }

View File

@@ -228,13 +228,15 @@ ORDER BY batch_id`,
for rows.Next() { for rows.Next() {
partition := models.Partition{ partition := models.Partition{
Id: uuid.New(), Id: uuid.New(),
ShouldUseRange: true, HasRange: true,
RetryCounter: 0, RetryCounter: 0,
IsLowerLimitInclusive: true, Range: models.PartitionRange{
IsMinInclusive: true,
},
} }
if err := rows.Scan(&partition.LowerLimit, &partition.UpperLimit); err != nil { if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
return nil, err return nil, err
} }

View File

@@ -11,12 +11,17 @@ type Batch struct {
RetryCounter int RetryCounter int
} }
type Partition struct { type PartitionRange struct {
Id uuid.UUID Min int64
ParentId uuid.UUID Max int64
LowerLimit int64 IsMinInclusive bool
UpperLimit int64 IsMaxInclusive bool
IsLowerLimitInclusive bool }
ShouldUseRange bool
RetryCounter int type Partition struct {
Id uuid.UUID
ParentId uuid.UUID
Range PartitionRange
HasRange bool
RetryCounter int
} }