refactor: remove unused ColumnType and inspect-columns files; update migration job to use separate table analyzers for source and target databases

This commit is contained in:
2026-04-12 19:16:14 -05:00
parent aded502ee4
commit 01780b4b02
6 changed files with 52 additions and 387 deletions

View File

@@ -1,44 +0,0 @@
package main
type ColumnType struct {
name string
hasMaxLength bool
hasPrecisionScale bool
userType string
systemType string
unifiedType string
nullable bool
maxLength int64
precision int64
scale int64
}
func (c *ColumnType) Name() string {
return c.name
}
func (c *ColumnType) UserType() string {
return c.userType
}
func (c *ColumnType) SystemType() string {
return c.systemType
}
func (c *ColumnType) Length() (length int64, ok bool) {
return c.maxLength, c.hasMaxLength
}
func (c *ColumnType) DecimalSize() (precision, scale int64, ok bool) {
return c.precision, c.scale, c.hasPrecisionScale
}
func (c *ColumnType) Nullable() bool {
return c.nullable
}
func (c *ColumnType) Type() string {
return c.unifiedType
}

View File

@@ -1,316 +0,0 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func GetUnifiedType(systemType string) string {
systemType = strings.ToLower(systemType)
if systemType == "varchar" || systemType == "char" || systemType == "nvarchar" || systemType == "nchar" || systemType == "text" || systemType == "ntext" {
return "STRING"
}
if systemType == "int" || systemType == "int4" || systemType == "integer" || systemType == "smallint" || systemType == "int2" || systemType == "bigint" || systemType == "int8" || systemType == "tinyint" {
return "INTEGER"
}
if systemType == "decimal" || systemType == "numeric" {
return "DECIMAL"
}
if systemType == "float" || systemType == "real" || systemType == "double precision" {
return "FLOAT"
}
if systemType == "bit" || systemType == "boolean" {
return "BOOLEAN"
}
if systemType == "date" {
return "DATE"
}
if systemType == "time" || systemType == "time without time zone" {
return "TIME"
}
if systemType == "datetime" || systemType == "datetime2" || systemType == "timestamp" || systemType == "timestamptz" || systemType == "timestamp with time zone" {
return "TIMESTAMP"
}
if systemType == "binary" || systemType == "varbinary" || systemType == "image" || systemType == "bytea" {
return "BINARY"
}
if systemType == "uniqueidentifier" || systemType == "uuid" {
return "UUID"
}
if systemType == "json" {
return "JSON"
}
if systemType == "geometry" || systemType == "geography" {
return "GEOMETRY"
}
return strings.ToUpper(systemType)
}
func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, scale *int64) models.ColumnType {
stringTypes := map[string]bool{
"varchar": true, "char": true, "character": true, "text": true, "character varying": true,
}
decimalTypes := map[string]bool{
"decimal": true, "numeric": true,
}
if stringTypes[column.systemType] {
if maxLength != nil {
column.maxLength = *maxLength
column.hasMaxLength = true
} else {
column.maxLength = -1
column.hasMaxLength = false
}
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
} else if decimalTypes[column.systemType] {
column.hasMaxLength = false
column.maxLength = -1
if precision != nil && scale != nil {
column.precision = *precision
column.scale = *scale
column.hasPrecisionScale = true
} else {
column.precision = -1
column.scale = -1
column.hasPrecisionScale = false
}
} else {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
}
column.unifiedType = GetUnifiedType(column.systemType)
colType := models.NewColumnType(
column.name,
column.hasMaxLength,
column.hasPrecisionScale,
column.userType,
column.systemType,
column.unifiedType,
column.nullable,
column.maxLength,
column.precision,
column.scale,
)
return colType
}
func GetColumnTypesPostgres(db *pgxpool.Pool, tableInfo config.TargetTableInfo) ([]models.ColumnType, error) {
query := `
SELECT
c.column_name AS name,
c.data_type AS user_type,
c.udt_name AS system_type,
(CASE WHEN c.is_nullable = 'YES' THEN TRUE ELSE FALSE END) AS nullable,
c.character_maximum_length AS max_length,
c.numeric_precision AS precision,
c.numeric_scale AS scale
FROM information_schema.columns c
WHERE c.table_schema = $1 AND c.table_name = $2
ORDER BY c.ordinal_position;
`
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
rows, err := db.Query(ctx, query, tableInfo.Schema, tableInfo.Table)
if err != nil {
return nil, fmt.Errorf("Error querying column types: %w", err)
}
defer rows.Close()
var colTypes []models.ColumnType
for rows.Next() {
var column ColumnType
var scanMaxLength *int64
var scanPrecision *int64
var scanScale *int64
if err := rows.Scan(
&column.name,
&column.userType,
&column.systemType,
&column.nullable,
&scanMaxLength,
&scanPrecision,
&scanScale,
); err != nil {
return nil, fmt.Errorf("Error scanning column type results: %w", err)
}
colTypes = append(colTypes, MapPostgresColumn(column, scanMaxLength, scanPrecision, scanScale))
}
return colTypes, nil
}
func MapMssqlColumn(column ColumnType) models.ColumnType {
stringTypes := map[string]bool{
"varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true,
}
decimalTypes := map[string]bool{
"decimal": true, "numeric": true,
}
if stringTypes[column.systemType] {
column.hasMaxLength = true
if column.systemType == "nvarchar" || column.systemType == "nchar" {
if column.maxLength > 0 {
column.maxLength = column.maxLength / 2
}
}
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
} else if decimalTypes[column.systemType] {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = true
} else {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
}
column.unifiedType = GetUnifiedType(column.systemType)
colType := models.NewColumnType(
column.name,
column.hasMaxLength,
column.hasPrecisionScale,
column.userType,
column.systemType,
column.unifiedType,
column.nullable,
column.maxLength,
column.precision,
column.scale,
)
return colType
}
func GetColumnTypesMssql(db *sql.DB, tableInfo config.SourceTableInfo) ([]models.ColumnType, error) {
query := `
SELECT
c.name AS name,
t.name AS user_type,
CASE WHEN t.is_user_defined = 0 THEN t.name ELSE bt.name END AS system_type,
c.is_nullable AS nullable,
c.max_length AS max_length,
c.precision AS precision,
c.scale AS scale
FROM sys.columns c
JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table
ORDER BY c.column_id;
`
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
rows, err := db.QueryContext(ctx, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil {
return nil, fmt.Errorf("Error querying column types: %w", err)
}
defer rows.Close()
var colTypes []models.ColumnType
for rows.Next() {
var column ColumnType
if err := rows.Scan(
&column.name,
&column.userType,
&column.systemType,
&column.nullable,
&column.maxLength,
&column.precision,
&column.scale,
); err != nil {
return nil, fmt.Errorf("Error scanning column type results: %W", err)
}
if strings.HasPrefix(column.name, "graph_id") && column.systemType == "bigint" {
continue
}
colTypes = append(colTypes, MapMssqlColumn(column))
}
return colTypes, nil
}
func GetColumnTypes(
sourceDb *sql.DB,
targetDb *pgxpool.Pool,
sourceTable config.SourceTableInfo,
targetTable config.TargetTableInfo,
) ([]models.ColumnType, []models.ColumnType, error) {
var sourceDbErr error
var targetDbErr error
var sourceColTypes []models.ColumnType
var targetColTypes []models.ColumnType
var wg sync.WaitGroup
wg.Go(func() {
sourceColTypes, sourceDbErr = GetColumnTypesMssql(sourceDb, sourceTable)
if sourceDbErr != nil {
log.Error("Error (sourceDb): ", sourceDbErr)
}
})
wg.Go(func() {
targetColTypes, targetDbErr = GetColumnTypesPostgres(targetDb, targetTable)
if targetDbErr != nil {
log.Error("Error (targetDb): ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Error querying column types")
}
return sourceColTypes, targetColTypes, nil
}

View File

@@ -91,7 +91,8 @@ func processMigrationJobs(
chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup
tableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer()
loader := loaders.NewPostgresLoader(targetDb)
@@ -102,9 +103,8 @@ func processMigrationJobs(
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob(
ctx,
sourceDb,
targetDb,
tableAnalyzer,
sourceTableAnalyzer,
targetTableAnalyzer,
extractor,
transformer,
loader,

View File

@@ -2,7 +2,6 @@ package main
import (
"context"
"database/sql"
"sync"
"sync/atomic"
"time"
@@ -12,21 +11,22 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
func processMigrationJob(
ctx context.Context,
sourceDb *sql.DB,
targetDb *pgxpool.Pool,
tableAnalyzer etl.TableAnalyzer,
sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor,
transformer etl.Transformer,
loader etl.Loader,
job config.Job,
) JobResult {
jobCtx, cancel := context.WithCancel(ctx)
defer cancel()
result := JobResult{
JobName: job.Name,
StartTime: time.Now(),
@@ -34,21 +34,38 @@ func processMigrationJob(
var rowsRead, rowsLoaded, rowsFailed int64
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable)
var wgQueryColumnTypes errgroup.Group
var sourceColTypes, targetColTypes []models.ColumnType
wgQueryColumnTypes.Go(func() error {
var err error
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(jobCtx, job.SourceTable.TableInfo)
if err != nil {
return err
}
return nil
})
wgQueryColumnTypes.Go(func() error {
var err error
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(jobCtx, job.TargetTable.TableInfo)
if err != nil {
return err
}
return nil
})
err := wgQueryColumnTypes.Wait()
if err != nil {
result.Error = err
return result
}
logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types")
jobCtx, cancel := context.WithCancel(ctx)
defer cancel()
partitions, err := table_analyzers.PartitionRangeGenerator(
jobCtx,
tableAnalyzer,
sourceTableAnalyzer,
job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey,
job.RowsPerPartition,
@@ -72,6 +89,7 @@ func processMigrationJob(
go func() {
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
cancel()
result.Error = err
}
@@ -140,25 +158,39 @@ func processMigrationJob(
}
go func() {
log.Debugf("Waiting for goroutines (%v)", job.Name)
wgActivePartitions.Wait()
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name)
close(chExtractorErrors)
log.Debugf("chExtractorErrors is closed (%v)", job.Name)
wgExtractors.Wait()
log.Debugf("wgExtractors is empty (%v)", job.Name)
close(chBatchesRaw)
log.Debugf("chBatchesRaw is closed (%v)", job.Name)
wgTransformers.Wait()
log.Debugf("wgTransformers is empty (%v)", job.Name)
wgActiveBatches.Wait()
log.Debugf("wgActiveBatches is empty (%v)", job.Name)
close(chBatchesTransformed)
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
close(chLoadersErrors)
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
wgLoaders.Wait()
log.Debugf("wgLoaders is empty (%v)", job.Name)
cancel()
}()
log.Debugf("waiting for local context to be done (%v)", job.Name)
<-jobCtx.Done()
log.Debugf("local context done (%v)", job.Name)
if ctx.Err() != nil {
result.Error = ctx.Err()
@@ -171,11 +203,3 @@ func processMigrationJob(
return result
}
func logColumnTypes(columnTypes []models.ColumnType, label string) {
log.Debug(label)
for _, col := range columnTypes {
log.Debugf("%+v", col)
}
}

2
go.mod
View File

@@ -10,6 +10,7 @@ require (
github.com/microsoft/go-mssqldb v1.9.8
github.com/sirupsen/logrus v1.9.4
github.com/twpayne/go-geom v1.6.1
golang.org/x/sync v0.19.0
gopkg.in/yaml.v3 v3.0.1
)
@@ -23,7 +24,6 @@ require (
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
)

View File

@@ -36,6 +36,7 @@ LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_t
JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table
AND c.name NOT LIKE 'graph_id%'
ORDER BY c.column_id;`
type rawColumnMssql struct {