refactor: update column field names in ExtractionQuery struct; enhance logging in consume and process methods

This commit is contained in:
2026-04-27 00:25:48 -05:00
parent 52fe083ab7
commit 00459e42e6
7 changed files with 16 additions and 7 deletions

View File

@@ -182,10 +182,10 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
sbQuery.WriteString("SELECT ") sbQuery.WriteString("SELECT ")
if len(q.columns) == 0 { if len(q.Columns) == 0 {
sbQuery.WriteString("*") sbQuery.WriteString("*")
} else { } else {
for i, col := range q.columns { for i, col := range q.Columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name()) fmt.Fprintf(&sbQuery, "[%s]", col.Name())
switch col.Type() { switch col.Type() {
@@ -193,7 +193,7 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name()) fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
} }
if i < len(q.columns)-1 { if i < len(q.Columns)-1 {
sbQuery.WriteString(", ") sbQuery.WriteString(", ")
} }
} }

View File

@@ -135,10 +135,10 @@ func (pw *postgresDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQu
sbQuery.WriteString("SELECT ") sbQuery.WriteString("SELECT ")
if len(q.columns) == 0 { if len(q.Columns) == 0 {
sbQuery.WriteString("*") sbQuery.WriteString("*")
} else { } else {
for i, col := range q.columns { for i, col := range q.Columns {
switch col.Type() { switch col.Type() {
case "GEOMETRY": case "GEOMETRY":
fmt.Fprintf(&sbQuery, `ST_AsEWKB("%s") AS "%s"`, col.Name(), col.Name()) fmt.Fprintf(&sbQuery, `ST_AsEWKB("%s") AS "%s"`, col.Name(), col.Name())
@@ -146,7 +146,7 @@ func (pw *postgresDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQu
fmt.Fprintf(&sbQuery, `"%s"`, col.Name()) fmt.Fprintf(&sbQuery, `"%s"`, col.Name())
} }
if i < len(q.columns)-1 { if i < len(q.Columns)-1 {
sbQuery.WriteString(", ") sbQuery.WriteString(", ")
} }
} }

View File

@@ -36,7 +36,7 @@ type ExtractionQuery struct {
Schema string Schema string
Table string Table string
PrimaryKey string PrimaryKey string
columns []models.ColumnType Columns []models.ColumnType
LowerLimit ExtractorQueryLimit LowerLimit ExtractorQueryLimit
UpperLimit ExtractorQueryLimit UpperLimit ExtractorQueryLimit
} }

View File

@@ -66,6 +66,8 @@ func (ex *GenericExtractor) Consume(
) )
if rowsReadResult > 0 { if rowsReadResult > 0 {
// current := atomic.LoadInt64(rowsRead)
// logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsRead, int64(rowsReadResult)) atomic.AddInt64(rowsRead, int64(rowsReadResult))
} }

View File

@@ -54,6 +54,7 @@ func (ex *GenericExtractor) ProcessPartition(
Schema: tableInfo.Schema, Schema: tableInfo.Schema,
Table: tableInfo.Table, Table: tableInfo.Table,
PrimaryKey: tableInfo.PrimaryKey, PrimaryKey: tableInfo.PrimaryKey,
Columns: columns,
LowerLimit: dbwrapper.ExtractorQueryLimit{ LowerLimit: dbwrapper.ExtractorQueryLimit{
IsValid: partition.HasRange && partition.Range.Min > 0, IsValid: partition.HasRange && partition.Range.Min > 0,
IsInclusive: partition.Range.IsMinInclusive, IsInclusive: partition.Range.IsMinInclusive,
@@ -66,6 +67,7 @@ func (ex *GenericExtractor) ProcessPartition(
}, },
} }
// logrus.Debugf("Querying with: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
rows, err := ex.db.QueryFromObject(ctx, query) rows, err := ex.db.QueryFromObject(ctx, query)
if err != nil { if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/sirupsen/logrus"
) )
func PartitionRangeGenerator( func PartitionRangeGenerator(
@@ -32,6 +33,7 @@ func PartitionRangeGenerator(
} }
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo) rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -51,5 +53,7 @@ func PartitionRangeGenerator(
return nil, err return nil, err
} }
// logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table)
return partitions, nil return partitions, nil
} }

View File

@@ -234,6 +234,7 @@ ORDER BY batch_id`,
RetryCounter: 0, RetryCounter: 0,
Range: models.PartitionRange{ Range: models.PartitionRange{
IsMinInclusive: true, IsMinInclusive: true,
IsMaxInclusive: true,
}, },
} }