refactor: update partition handling to use Range struct for better clarity and consistency
This commit is contained in:
@@ -103,8 +103,8 @@ func ExtractorErrorHandler(
|
|||||||
if err.HasLastId {
|
if err.HasLastId {
|
||||||
newPartition.ParentId = err.Partition.Id
|
newPartition.ParentId = err.Partition.Id
|
||||||
newPartition.Id = uuid.New()
|
newPartition.Id = uuid.New()
|
||||||
newPartition.LowerLimit = err.LastId
|
newPartition.Range.Min = err.LastId
|
||||||
newPartition.IsLowerLimitInclusive = false
|
newPartition.Range.IsMinInclusive = false
|
||||||
}
|
}
|
||||||
|
|
||||||
requeueWithBackoff(ctx, delay, func() {
|
requeueWithBackoff(ctx, delay, func() {
|
||||||
|
|||||||
@@ -107,13 +107,13 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
|
|||||||
indexPrimaryKey int,
|
indexPrimaryKey int,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
) (int, error) {
|
) (int, error) {
|
||||||
query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive)
|
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
|
||||||
|
|
||||||
var queryArgs []any
|
var queryArgs []any
|
||||||
if partition.ShouldUseRange {
|
if partition.HasRange {
|
||||||
queryArgs = append(queryArgs,
|
queryArgs = append(queryArgs,
|
||||||
sql.Named("min", partition.LowerLimit),
|
sql.Named("min", partition.Range.Min),
|
||||||
sql.Named("max", partition.UpperLimit),
|
sql.Named("max", partition.Range.Max),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
|
|||||||
) (int, error) {
|
) (int, error) {
|
||||||
query := buildExtractQueryPostgres(tableInfo, columns)
|
query := buildExtractQueryPostgres(tableInfo, columns)
|
||||||
|
|
||||||
if partition.ShouldUseRange {
|
if partition.HasRange {
|
||||||
return 0, errors.New("Batch config not yet supported")
|
return 0, errors.New("Batch config not yet supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ func PartitionRangeGenerator(
|
|||||||
if rowsCount <= rowsPerPartition {
|
if rowsCount <= rowsPerPartition {
|
||||||
return []models.Partition{{
|
return []models.Partition{{
|
||||||
Id: uuid.New(),
|
Id: uuid.New(),
|
||||||
ShouldUseRange: false,
|
HasRange: false,
|
||||||
RetryCounter: 0,
|
RetryCounter: 0,
|
||||||
}}, nil
|
}}, nil
|
||||||
|
|
||||||
|
|||||||
@@ -229,12 +229,14 @@ ORDER BY batch_id`,
|
|||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
partition := models.Partition{
|
partition := models.Partition{
|
||||||
Id: uuid.New(),
|
Id: uuid.New(),
|
||||||
ShouldUseRange: true,
|
HasRange: true,
|
||||||
RetryCounter: 0,
|
RetryCounter: 0,
|
||||||
IsLowerLimitInclusive: true,
|
Range: models.PartitionRange{
|
||||||
|
IsMinInclusive: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := rows.Scan(&partition.LowerLimit, &partition.UpperLimit); err != nil {
|
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,12 +11,17 @@ type Batch struct {
|
|||||||
RetryCounter int
|
RetryCounter int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PartitionRange struct {
|
||||||
|
Min int64
|
||||||
|
Max int64
|
||||||
|
IsMinInclusive bool
|
||||||
|
IsMaxInclusive bool
|
||||||
|
}
|
||||||
|
|
||||||
type Partition struct {
|
type Partition struct {
|
||||||
Id uuid.UUID
|
Id uuid.UUID
|
||||||
ParentId uuid.UUID
|
ParentId uuid.UUID
|
||||||
LowerLimit int64
|
Range PartitionRange
|
||||||
UpperLimit int64
|
HasRange bool
|
||||||
IsLowerLimitInclusive bool
|
|
||||||
ShouldUseRange bool
|
|
||||||
RetryCounter int
|
RetryCounter int
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user