feat: add MSSQL table analyzer and integrate partition range generation for improved data migration

This commit is contained in:
2026-04-11 01:23:13 -05:00
parent 7830ae862d
commit 4d3cd6e4cf
5 changed files with 89 additions and 40 deletions

View File

@@ -9,6 +9,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -90,6 +91,7 @@ func processMigrationJobs(
chJobs := make(chan config.Job, len(jobs)) chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup var wgJobs sync.WaitGroup
tableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer() transformer := transformers.NewMssqlTransformer()
loader := loaders.NewPostgresLoader(targetDb) loader := loaders.NewPostgresLoader(targetDb)
@@ -102,6 +104,7 @@ func processMigrationJobs(
ctx, ctx,
sourceDb, sourceDb,
targetDb, targetDb,
tableAnalyzer,
extractor, extractor,
transformer, transformer,
loader, loader,

View File

@@ -10,6 +10,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb" _ "github.com/microsoft/go-mssqldb"
@@ -20,6 +21,7 @@ func processMigrationJob(
ctx context.Context, ctx context.Context,
sourceDb *sql.DB, sourceDb *sql.DB,
targetDb *pgxpool.Pool, targetDb *pgxpool.Pool,
tableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor, extractor etl.Extractor,
transformer etl.Transformer, transformer etl.Transformer,
loader etl.Loader, loader etl.Loader,
@@ -44,7 +46,13 @@ func processMigrationJob(
jobCtx, cancel := context.WithCancel(ctx) jobCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
partitions, err := partitionGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerPartition) partitions, err := table_analyzers.PartitionRangeGenerator(
jobCtx,
tableAnalyzer,
job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey,
job.RowsPerPartition,
)
if err != nil { if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }

View File

@@ -0,0 +1,40 @@
package table_analyzers
import (
"context"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func PartitionRangeGenerator(
ctx context.Context,
tableAnalyzer etl.TableAnalyzer,
tableInfo config.TableInfo,
partitionColumn string,
rowsPerPartition int64,
) ([]models.Partition, error) {
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
if err != nil {
return nil, err
}
if rowsCount <= rowsPerPartition {
return []models.Partition{{
Id: uuid.New(),
ShouldUseRange: false,
RetryCounter: 0,
}}, nil
}
partitionsCount := rowsCount / rowsPerPartition
partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount)
if err != nil {
return nil, err
}
return partitions, nil
}

View File

@@ -1,4 +1,4 @@
package main package table_analyzers
import ( import (
"context" "context"
@@ -7,14 +7,32 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) (int64, error) { type MssqlTableAnalyzer struct {
db *sql.DB
}
func NewMssqlTableAnalyzer(db *sql.DB) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db}
}
func (ta *MssqlTableAnalyzer) QueryColumnTypes(
ctx context.Context,
tableInfo config.TableInfo,
) ([]models.ColumnType, error) {
return []models.ColumnType{}, nil
}
func (ta *MssqlTableAnalyzer) EstimateTotalRows(
ctx context.Context,
tableInfo config.TableInfo,
) (int64, error) {
query := ` query := `
SELECT SELECT SUM(p.rows) AS count
SUM(p.rows) AS count
FROM sys.tables t FROM sys.tables t
JOIN sys.schemas s ON t.schema_id = s.schema_id JOIN sys.schemas s ON t.schema_id = s.schema_id
JOIN sys.partitions p ON t.object_id = p.object_id JOIN sys.partitions p ON t.object_id = p.object_id
@@ -25,7 +43,7 @@ GROUP BY t.name`
defer cancel() defer cancel()
var rowsCount int64 var rowsCount int64
err := db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) err := ta.db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -33,26 +51,30 @@ GROUP BY t.name`
return rowsCount, nil return rowsCount, nil
} }
func calculatePartitionRanges(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, maxPartitions int64) ([]models.Partition, error) { func (ta *MssqlTableAnalyzer) CalculatePartitionRanges(
ctx context.Context,
tableInfo config.TableInfo,
partitionColumn string,
maxPartitions int64,
) ([]models.Partition, error) {
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
MIN([%s]) AS lower_limit, MIN([%s]) AS lower_limit,
MAX([%s]) AS upper_limit MAX([%s]) AS upper_limit
FROM FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T
(SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T
GROUP BY batch_id GROUP BY batch_id
ORDER BY batch_id`, ORDER BY batch_id`,
tableInfo.PrimaryKey, partitionColumn,
tableInfo.PrimaryKey, partitionColumn,
tableInfo.PrimaryKey, partitionColumn,
tableInfo.PrimaryKey, partitionColumn,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table) tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel() defer cancel()
rows, err := db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) rows, err := ta.db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -81,28 +103,3 @@ ORDER BY batch_id`,
return partitions, nil return partitions, nil
} }
func partitionGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerPartition int64) ([]models.Partition, error) {
rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo)
if err != nil {
return nil, err
}
var partitionsCount int64 = 1
if rowsCount > rowsPerPartition {
partitionsCount = rowsCount / rowsPerPartition
} else {
return []models.Partition{{
Id: uuid.New(),
ShouldUseRange: false,
RetryCounter: 0,
}}, nil
}
partitions, err := calculatePartitionRanges(ctx, db, tableInfo, partitionsCount)
if err != nil {
return nil, err
}
return partitions, nil
}

View File

@@ -93,6 +93,7 @@ type TableAnalyzer interface {
CalculatePartitionRanges( CalculatePartitionRanges(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,
maxPartitions int, partitionColumn string,
maxPartitions int64,
) ([]models.Partition, error) ) ([]models.Partition, error)
} }