Compare commits
2 Commits
ed889b740a
...
0384d5423f
| Author | SHA1 | Date | |
|---|---|---|---|
|
0384d5423f
|
|||
|
1ce3d9e153
|
@@ -30,6 +30,11 @@ jobs:
|
||||
table: MANZANA
|
||||
pre_sql:
|
||||
- 'SELECT 1'
|
||||
range:
|
||||
min: 1000000
|
||||
max: 2000000
|
||||
is_min_inclusive: false
|
||||
is_max_inclusive: true
|
||||
|
||||
- name: red_puerto
|
||||
enabled: true
|
||||
|
||||
@@ -50,6 +50,12 @@ type Job struct {
|
||||
PreSQL []string `yaml:"pre_sql"`
|
||||
PostSQL []string `yaml:"post_sql"`
|
||||
JobConfig `yaml:",inline"`
|
||||
Range struct {
|
||||
Min int64 `yaml:"min"`
|
||||
Max int64 `yaml:"max"`
|
||||
IsMinInclusive bool `yaml:"is_min_inclusive"`
|
||||
IsMaxInclusive bool `yaml:"is_max_inclusive"`
|
||||
}
|
||||
}
|
||||
|
||||
type MigrationConfig struct {
|
||||
|
||||
@@ -103,8 +103,8 @@ func ExtractorErrorHandler(
|
||||
if err.HasLastId {
|
||||
newPartition.ParentId = err.Partition.Id
|
||||
newPartition.Id = uuid.New()
|
||||
newPartition.LowerLimit = err.LastId
|
||||
newPartition.IsLowerLimitInclusive = false
|
||||
newPartition.Range.Min = err.LastId
|
||||
newPartition.Range.IsMinInclusive = false
|
||||
}
|
||||
|
||||
requeueWithBackoff(ctx, delay, func() {
|
||||
|
||||
@@ -107,13 +107,13 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int, error) {
|
||||
query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive)
|
||||
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
|
||||
|
||||
var queryArgs []any
|
||||
if partition.ShouldUseRange {
|
||||
if partition.HasRange {
|
||||
queryArgs = append(queryArgs,
|
||||
sql.Named("min", partition.LowerLimit),
|
||||
sql.Named("max", partition.UpperLimit),
|
||||
sql.Named("min", partition.Range.Min),
|
||||
sql.Named("max", partition.Range.Max),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
|
||||
) (int, error) {
|
||||
query := buildExtractQueryPostgres(tableInfo, columns)
|
||||
|
||||
if partition.ShouldUseRange {
|
||||
if partition.HasRange {
|
||||
return 0, errors.New("Batch config not yet supported")
|
||||
}
|
||||
|
||||
|
||||
@@ -23,9 +23,9 @@ func PartitionRangeGenerator(
|
||||
|
||||
if rowsCount <= rowsPerPartition {
|
||||
return []models.Partition{{
|
||||
Id: uuid.New(),
|
||||
ShouldUseRange: false,
|
||||
RetryCounter: 0,
|
||||
Id: uuid.New(),
|
||||
HasRange: false,
|
||||
RetryCounter: 0,
|
||||
}}, nil
|
||||
|
||||
}
|
||||
|
||||
@@ -228,13 +228,15 @@ ORDER BY batch_id`,
|
||||
|
||||
for rows.Next() {
|
||||
partition := models.Partition{
|
||||
Id: uuid.New(),
|
||||
ShouldUseRange: true,
|
||||
RetryCounter: 0,
|
||||
IsLowerLimitInclusive: true,
|
||||
Id: uuid.New(),
|
||||
HasRange: true,
|
||||
RetryCounter: 0,
|
||||
Range: models.PartitionRange{
|
||||
IsMinInclusive: true,
|
||||
},
|
||||
}
|
||||
|
||||
if err := rows.Scan(&partition.LowerLimit, &partition.UpperLimit); err != nil {
|
||||
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -11,12 +11,17 @@ type Batch struct {
|
||||
RetryCounter int
|
||||
}
|
||||
|
||||
type Partition struct {
|
||||
Id uuid.UUID
|
||||
ParentId uuid.UUID
|
||||
LowerLimit int64
|
||||
UpperLimit int64
|
||||
IsLowerLimitInclusive bool
|
||||
ShouldUseRange bool
|
||||
RetryCounter int
|
||||
type PartitionRange struct {
|
||||
Min int64
|
||||
Max int64
|
||||
IsMinInclusive bool
|
||||
IsMaxInclusive bool
|
||||
}
|
||||
|
||||
type Partition struct {
|
||||
Id uuid.UUID
|
||||
ParentId uuid.UUID
|
||||
Range PartitionRange
|
||||
HasRange bool
|
||||
RetryCounter int
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user