From 4d3cd6e4cf8a26e9d4b095da85337e56301bd17f Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Sat, 11 Apr 2026 01:23:13 -0500 Subject: [PATCH] feat: add MSSQL table analyzer and integrate partition range generation for improved data migration --- cmd/go_migrate/main.go | 3 + cmd/go_migrate/process.go | 10 ++- internal/app/etl/table_analyzers/main.go | 40 ++++++++++ .../app/etl/table_analyzers/mssql.go | 73 +++++++++---------- internal/app/etl/types.go | 3 +- 5 files changed, 89 insertions(+), 40 deletions(-) create mode 100644 internal/app/etl/table_analyzers/main.go rename cmd/go_migrate/batch-generator.go => internal/app/etl/table_analyzers/mssql.go (50%) diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 6afdf01..4253dd6 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -9,6 +9,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" @@ -90,6 +91,7 @@ func processMigrationJobs( chJobs := make(chan config.Job, len(jobs)) var wgJobs sync.WaitGroup + tableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb) transformer := transformers.NewMssqlTransformer() loader := loaders.NewPostgresLoader(targetDb) @@ -102,6 +104,7 @@ func processMigrationJobs( ctx, sourceDb, targetDb, + tableAnalyzer, extractor, transformer, loader, diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 75ef517..3a8b27a 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -10,6 +10,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" @@ -20,6 +21,7 @@ func processMigrationJob( ctx context.Context, sourceDb *sql.DB, targetDb *pgxpool.Pool, + tableAnalyzer etl.TableAnalyzer, extractor etl.Extractor, transformer etl.Transformer, loader etl.Loader, @@ -44,7 +46,13 @@ func processMigrationJob( jobCtx, cancel := context.WithCancel(ctx) defer cancel() - partitions, err := partitionGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerPartition) + partitions, err := table_analyzers.PartitionRangeGenerator( + jobCtx, + tableAnalyzer, + job.SourceTable.TableInfo, + job.SourceTable.PrimaryKey, + job.RowsPerPartition, + ) if err != nil { log.Error("Unexpected error calculating batch ranges: ", err) } diff --git a/internal/app/etl/table_analyzers/main.go b/internal/app/etl/table_analyzers/main.go new file mode 100644 index 0000000..1ed61d0 --- /dev/null +++ b/internal/app/etl/table_analyzers/main.go @@ -0,0 +1,40 @@ +package table_analyzers + +import ( + "context" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" +) + +func PartitionRangeGenerator( + ctx context.Context, + tableAnalyzer etl.TableAnalyzer, + tableInfo config.TableInfo, + partitionColumn string, + rowsPerPartition int64, +) ([]models.Partition, error) { + rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo) + if err != nil { + return nil, err + } + + if rowsCount <= rowsPerPartition { + return []models.Partition{{ + Id: uuid.New(), + ShouldUseRange: false, + RetryCounter: 0, + }}, nil + + } + + partitionsCount := rowsCount / rowsPerPartition + partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount) + if err != nil { + return nil, err + } + + return partitions, nil +} diff --git a/cmd/go_migrate/batch-generator.go b/internal/app/etl/table_analyzers/mssql.go similarity index 50% rename from cmd/go_migrate/batch-generator.go rename to internal/app/etl/table_analyzers/mssql.go index 6c7a213..f72c63f 100644 --- a/cmd/go_migrate/batch-generator.go +++ b/internal/app/etl/table_analyzers/mssql.go @@ -1,4 +1,4 @@ -package main +package table_analyzers import ( "context" @@ -7,14 +7,32 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) -func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) (int64, error) { +type MssqlTableAnalyzer struct { + db *sql.DB +} + +func NewMssqlTableAnalyzer(db *sql.DB) etl.TableAnalyzer { + return &MssqlTableAnalyzer{db: db} +} + +func (ta *MssqlTableAnalyzer) QueryColumnTypes( + ctx context.Context, + tableInfo config.TableInfo, +) ([]models.ColumnType, error) { + return []models.ColumnType{}, nil +} + +func (ta *MssqlTableAnalyzer) EstimateTotalRows( + ctx context.Context, + tableInfo config.TableInfo, +) (int64, error) { query := ` -SELECT - SUM(p.rows) AS count +SELECT SUM(p.rows) AS count FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id JOIN sys.partitions p ON t.object_id = p.object_id @@ -25,7 +43,7 @@ GROUP BY t.name` defer cancel() var rowsCount int64 - err := db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) + err := ta.db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) if err != nil { return 0, err } @@ -33,26 +51,30 @@ GROUP BY t.name` return rowsCount, nil } -func calculatePartitionRanges(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, maxPartitions int64) ([]models.Partition, error) { +func (ta *MssqlTableAnalyzer) CalculatePartitionRanges( + ctx context.Context, + tableInfo config.TableInfo, + partitionColumn string, + maxPartitions int64, +) ([]models.Partition, error) { query := fmt.Sprintf(` SELECT MIN([%s]) AS lower_limit, MAX([%s]) AS upper_limit -FROM - (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T +FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T GROUP BY batch_id ORDER BY batch_id`, - tableInfo.PrimaryKey, - tableInfo.PrimaryKey, - tableInfo.PrimaryKey, - tableInfo.PrimaryKey, + partitionColumn, + partitionColumn, + partitionColumn, + partitionColumn, tableInfo.Schema, tableInfo.Table) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) defer cancel() - rows, err := db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) + rows, err := ta.db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) if err != nil { return nil, err } @@ -81,28 +103,3 @@ ORDER BY batch_id`, return partitions, nil } - -func partitionGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerPartition int64) ([]models.Partition, error) { - rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) - if err != nil { - return nil, err - } - - var partitionsCount int64 = 1 - if rowsCount > rowsPerPartition { - partitionsCount = rowsCount / rowsPerPartition - } else { - return []models.Partition{{ - Id: uuid.New(), - ShouldUseRange: false, - RetryCounter: 0, - }}, nil - } - - partitions, err := calculatePartitionRanges(ctx, db, tableInfo, partitionsCount) - if err != nil { - return nil, err - } - - return partitions, nil -} diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index 09f4a9c..a2c07a5 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -93,6 +93,7 @@ type TableAnalyzer interface { CalculatePartitionRanges( ctx context.Context, tableInfo config.TableInfo, - maxPartitions int, + partitionColumn string, + maxPartitions int64, ) ([]models.Partition, error) }