diff --git a/internal/app/db-wrapper/mssql.go b/internal/app/db-wrapper/mssql.go index 9ced297..d8fca95 100644 --- a/internal/app/db-wrapper/mssql.go +++ b/internal/app/db-wrapper/mssql.go @@ -182,10 +182,10 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery sbQuery.WriteString("SELECT ") - if len(q.columns) == 0 { + if len(q.Columns) == 0 { sbQuery.WriteString("*") } else { - for i, col := range q.columns { + for i, col := range q.Columns { fmt.Fprintf(&sbQuery, "[%s]", col.Name()) switch col.Type() { @@ -193,7 +193,7 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name()) } - if i < len(q.columns)-1 { + if i < len(q.Columns)-1 { sbQuery.WriteString(", ") } } diff --git a/internal/app/db-wrapper/postgres.go b/internal/app/db-wrapper/postgres.go index a65c064..37cc8e6 100644 --- a/internal/app/db-wrapper/postgres.go +++ b/internal/app/db-wrapper/postgres.go @@ -135,10 +135,10 @@ func (pw *postgresDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQu sbQuery.WriteString("SELECT ") - if len(q.columns) == 0 { + if len(q.Columns) == 0 { sbQuery.WriteString("*") } else { - for i, col := range q.columns { + for i, col := range q.Columns { switch col.Type() { case "GEOMETRY": fmt.Fprintf(&sbQuery, `ST_AsEWKB("%s") AS "%s"`, col.Name(), col.Name()) @@ -146,7 +146,7 @@ func (pw *postgresDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQu fmt.Fprintf(&sbQuery, `"%s"`, col.Name()) } - if i < len(q.columns)-1 { + if i < len(q.Columns)-1 { sbQuery.WriteString(", ") } } diff --git a/internal/app/db-wrapper/types.go b/internal/app/db-wrapper/types.go index f94fd86..e194710 100644 --- a/internal/app/db-wrapper/types.go +++ b/internal/app/db-wrapper/types.go @@ -36,7 +36,7 @@ type ExtractionQuery struct { Schema string Table string PrimaryKey string - columns []models.ColumnType + Columns []models.ColumnType LowerLimit ExtractorQueryLimit UpperLimit ExtractorQueryLimit } diff --git a/internal/app/etl/extractors/consume.go b/internal/app/etl/extractors/consume.go index 2d17da5..672a9a6 100644 --- a/internal/app/etl/extractors/consume.go +++ b/internal/app/etl/extractors/consume.go @@ -66,6 +66,8 @@ func (ex *GenericExtractor) Consume( ) if rowsReadResult > 0 { + // current := atomic.LoadInt64(rowsRead) + // logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table) atomic.AddInt64(rowsRead, int64(rowsReadResult)) } diff --git a/internal/app/etl/extractors/process.go b/internal/app/etl/extractors/process.go index 2f277ce..2708908 100644 --- a/internal/app/etl/extractors/process.go +++ b/internal/app/etl/extractors/process.go @@ -54,6 +54,7 @@ func (ex *GenericExtractor) ProcessPartition( Schema: tableInfo.Schema, Table: tableInfo.Table, PrimaryKey: tableInfo.PrimaryKey, + Columns: columns, LowerLimit: dbwrapper.ExtractorQueryLimit{ IsValid: partition.HasRange && partition.Range.Min > 0, IsInclusive: partition.Range.IsMinInclusive, @@ -66,6 +67,7 @@ func (ex *GenericExtractor) ProcessPartition( }, } + // logrus.Debugf("Querying with: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table) rows, err := ex.db.QueryFromObject(ctx, query) if err != nil { diff --git a/internal/app/etl/table_analyzers/main.go b/internal/app/etl/table_analyzers/main.go index cb08390..2949c46 100644 --- a/internal/app/etl/table_analyzers/main.go +++ b/internal/app/etl/table_analyzers/main.go @@ -7,6 +7,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" + "github.com/sirupsen/logrus" ) func PartitionRangeGenerator( @@ -32,6 +33,7 @@ func PartitionRangeGenerator( } rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo) + logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table) if err != nil { return nil, err } @@ -51,5 +53,7 @@ func PartitionRangeGenerator( return nil, err } + // logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table) + return partitions, nil } diff --git a/internal/app/etl/table_analyzers/mssql.go b/internal/app/etl/table_analyzers/mssql.go index 4faaf21..39f9daf 100644 --- a/internal/app/etl/table_analyzers/mssql.go +++ b/internal/app/etl/table_analyzers/mssql.go @@ -234,6 +234,7 @@ ORDER BY batch_id`, RetryCounter: 0, Range: models.PartitionRange{ IsMinInclusive: true, + IsMaxInclusive: true, }, }