diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index 6422c01..e2b1300 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -30,6 +30,8 @@ func buildExtractQueryMssql( includeRange bool, isMinInclusive bool, isMaxInclusive bool, + hasMin bool, + hasMax bool, ) string { var sbQuery strings.Builder @@ -53,23 +55,32 @@ func buildExtractQueryMssql( fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table) - if includeRange { - fmt.Fprintf(&sbQuery, " WHERE [%s]", tableInfo.PrimaryKey) - if isMinInclusive { - sbQuery.WriteString(" >=") - } else { - sbQuery.WriteString(" >") + if includeRange && (hasMin || hasMax) { + sbQuery.WriteString(" WHERE ") + + if hasMin { + fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey) + if isMinInclusive { + sbQuery.WriteString(" >=") + } else { + sbQuery.WriteString(" >") + } + sbQuery.WriteString(" @min") } - sbQuery.WriteString(" @min AND ") - fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey) - if isMaxInclusive { - sbQuery.WriteString(" <=") - } else { - sbQuery.WriteString(" <") + if hasMin && hasMax { + sbQuery.WriteString(" AND ") } - sbQuery.WriteString(" @max") + if hasMax { + fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey) + if isMaxInclusive { + sbQuery.WriteString(" <=") + } else { + sbQuery.WriteString(" <") + } + sbQuery.WriteString(" @max") + } } fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey) @@ -114,14 +125,16 @@ func (mssqlEx *MssqlExtractor) Exec( indexPrimaryKey int, chBatchesOut chan<- models.Batch, ) (int, error) { - query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive) + hasMin := partition.HasRange && partition.Range.Min > 0 + hasMax := partition.HasRange && partition.Range.Max > 0 + query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax) var queryArgs []any - if partition.HasRange { - queryArgs = append(queryArgs, - sql.Named("min", partition.Range.Min), - sql.Named("max", partition.Range.Max), - ) + if hasMin { + queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min)) + } + if hasMax { + queryArgs = append(queryArgs, sql.Named("max", partition.Range.Max)) } rowsRead := 0 diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index f83dc79..964f940 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -27,6 +27,8 @@ func buildExtractQueryPostgres( includeRange bool, isMinInclusive bool, isMaxInclusive bool, + hasMin bool, + hasMax bool, ) string { var sbColumns strings.Builder @@ -54,20 +56,34 @@ func buildExtractQueryPostgres( query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table) - if includeRange { - query += fmt.Sprintf(` WHERE "%s"`, sourceDbInfo.PrimaryKey) - if isMinInclusive { - query += " >=" - } else { - query += " >" + if includeRange && (hasMin || hasMax) { + query += " WHERE " + paramIdx := 1 + + if hasMin { + query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey) + if isMinInclusive { + query += " >=" + } else { + query += " >" + } + query += fmt.Sprintf(" $%d", paramIdx) + paramIdx++ } - query += " $1 AND " + fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey) - if isMaxInclusive { - query += " <=" - } else { - query += " <" + + if hasMin && hasMax { + query += " AND " + } + + if hasMax { + query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey) + if isMaxInclusive { + query += " <=" + } else { + query += " <" + } + query += fmt.Sprintf(" $%d", paramIdx) } - query += " $2" } query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey) @@ -84,14 +100,16 @@ func (postgresEx *PostgresExtractor) Exec( indexPrimaryKey int, chBatchesOut chan<- models.Batch, ) (int, error) { - query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive) + hasMin := partition.HasRange && partition.Range.Min > 0 + hasMax := partition.HasRange && partition.Range.Max > 0 + query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax) var queryArgs []any - if partition.HasRange { - queryArgs = append(queryArgs, - partition.Range.Min, - partition.Range.Max, - ) + if hasMin { + queryArgs = append(queryArgs, partition.Range.Min) + } + if hasMax { + queryArgs = append(queryArgs, partition.Range.Max) } rowsRead := 0 diff --git a/internal/app/etl/table_analyzers/main.go b/internal/app/etl/table_analyzers/main.go index 3d14dc7..cb08390 100644 --- a/internal/app/etl/table_analyzers/main.go +++ b/internal/app/etl/table_analyzers/main.go @@ -17,7 +17,7 @@ func PartitionRangeGenerator( rowsPerPartition int64, jobRange config.RangeConfig, ) ([]models.Partition, error) { - if jobRange.Max > 0 { + if jobRange.Min > 0 { return []models.Partition{{ Id: uuid.New(), HasRange: true,