From 9eb8800864b688a867ef84d847b6541364a83ede Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Tue, 21 Apr 2026 11:29:34 -0500 Subject: [PATCH] feat: add range configuration to job and update extractors for inclusive range handling --- cmd/go_migrate/process.go | 1 + internal/app/config/migration.go | 14 ++++---- internal/app/etl/extractors/mssql.go | 13 +++++-- internal/app/etl/extractors/postgres.go | 45 +++++++++++++++++++----- internal/app/etl/table_analyzers/main.go | 15 ++++++++ 5 files changed, 72 insertions(+), 16 deletions(-) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 7e972c4..19b9c27 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -80,6 +80,7 @@ func processMigrationJob( job.SourceTable.TableInfo, job.SourceTable.PrimaryKey, job.RowsPerPartition, + job.Range, ) if err != nil { log.Error("Unexpected error calculating batch ranges: ", err) diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index cfd4d06..873967b 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -42,6 +42,13 @@ type SourceTableInfo struct { PrimaryKey string `yaml:"primary_key"` } +type RangeConfig struct { + Min int64 `yaml:"min"` + Max int64 `yaml:"max"` + IsMinInclusive bool `yaml:"is_min_inclusive"` + IsMaxInclusive bool `yaml:"is_max_inclusive"` +} + type Job struct { Name string `yaml:"name"` Enabled bool `yaml:"enabled"` @@ -50,12 +57,7 @@ type Job struct { PreSQL []string `yaml:"pre_sql"` PostSQL []string `yaml:"post_sql"` JobConfig `yaml:",inline"` - Range struct { - Min int64 `yaml:"min"` - Max int64 `yaml:"max"` - IsMinInclusive bool `yaml:"is_min_inclusive"` - IsMaxInclusive bool `yaml:"is_max_inclusive"` - } + Range RangeConfig `yaml:"range"` } type MigrationConfig struct { diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index bebc50a..6422c01 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -29,6 +29,7 @@ func buildExtractQueryMssql( columns []models.ColumnType, includeRange bool, isMinInclusive bool, + isMaxInclusive bool, ) string { var sbQuery strings.Builder @@ -60,7 +61,15 @@ func buildExtractQueryMssql( sbQuery.WriteString(" >") } - fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", tableInfo.PrimaryKey) + sbQuery.WriteString(" @min AND ") + fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey) + if isMaxInclusive { + sbQuery.WriteString(" <=") + } else { + sbQuery.WriteString(" <") + } + + sbQuery.WriteString(" @max") } fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey) @@ -105,7 +114,7 @@ func (mssqlEx *MssqlExtractor) Exec( indexPrimaryKey int, chBatchesOut chan<- models.Batch, ) (int, error) { - query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive) + query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive) var queryArgs []any if partition.HasRange { diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index 71b826d..f83dc79 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -2,7 +2,6 @@ package extractors import ( "context" - "errors" "fmt" "strings" @@ -22,7 +21,13 @@ func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor { return &PostgresExtractor{db: db} } -func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string { +func buildExtractQueryPostgres( + sourceDbInfo config.SourceTableInfo, + columns []models.ColumnType, + includeRange bool, + isMinInclusive bool, + isMaxInclusive bool, +) string { var sbColumns strings.Builder if len(columns) == 0 { @@ -47,7 +52,27 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo } } - return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) + query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table) + + if includeRange { + query += fmt.Sprintf(` WHERE "%s"`, sourceDbInfo.PrimaryKey) + if isMinInclusive { + query += " >=" + } else { + query += " >" + } + query += " $1 AND " + fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey) + if isMaxInclusive { + query += " <=" + } else { + query += " <" + } + query += " $2" + } + + query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey) + + return query } func (postgresEx *PostgresExtractor) Exec( @@ -59,14 +84,18 @@ func (postgresEx *PostgresExtractor) Exec( indexPrimaryKey int, chBatchesOut chan<- models.Batch, ) (int, error) { - query := buildExtractQueryPostgres(tableInfo, columns) + query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive) + var queryArgs []any if partition.HasRange { - return 0, errors.New("Batch config not yet supported") + queryArgs = append(queryArgs, + partition.Range.Min, + partition.Range.Max, + ) } rowsRead := 0 - rows, err := postgresEx.db.Query(ctx, query) + rows, err := postgresEx.db.Query(ctx, query, queryArgs...) if err != nil { return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } @@ -77,7 +106,7 @@ func (postgresEx *PostgresExtractor) Exec( for rows.Next() { values, err := rows.Values() if err != nil { - return rowsRead, errors.New("Unexpected error reading rows from source") + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } rowsRead++ @@ -95,7 +124,7 @@ func (postgresEx *PostgresExtractor) Exec( } if err := rows.Err(); err != nil { - return rowsRead, errors.New("Unexpected error reading rows from source") + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } if len(batchRows) > 0 { diff --git a/internal/app/etl/table_analyzers/main.go b/internal/app/etl/table_analyzers/main.go index c226dbe..3d14dc7 100644 --- a/internal/app/etl/table_analyzers/main.go +++ b/internal/app/etl/table_analyzers/main.go @@ -15,7 +15,22 @@ func PartitionRangeGenerator( tableInfo config.TableInfo, partitionColumn string, rowsPerPartition int64, + jobRange config.RangeConfig, ) ([]models.Partition, error) { + if jobRange.Max > 0 { + return []models.Partition{{ + Id: uuid.New(), + HasRange: true, + RetryCounter: 0, + Range: models.PartitionRange{ + Min: jobRange.Min, + Max: jobRange.Max, + IsMinInclusive: jobRange.IsMinInclusive, + IsMaxInclusive: jobRange.IsMaxInclusive, + }, + }}, nil + } + rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo) if err != nil { return nil, err