From 01780b4b02aeffa1623d8f770270f2b6396ec966 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Sun, 12 Apr 2026 19:16:14 -0500 Subject: [PATCH] refactor: remove unused ColumnType and inspect-columns files; update migration job to use separate table analyzers for source and target databases --- cmd/go_migrate/colum-type.go | 44 --- cmd/go_migrate/inspect-columns.go | 316 ---------------------- cmd/go_migrate/main.go | 8 +- cmd/go_migrate/process.go | 68 +++-- go.mod | 2 +- internal/app/etl/table_analyzers/mssql.go | 1 + 6 files changed, 52 insertions(+), 387 deletions(-) delete mode 100644 cmd/go_migrate/colum-type.go delete mode 100644 cmd/go_migrate/inspect-columns.go diff --git a/cmd/go_migrate/colum-type.go b/cmd/go_migrate/colum-type.go deleted file mode 100644 index cfd76a4..0000000 --- a/cmd/go_migrate/colum-type.go +++ /dev/null @@ -1,44 +0,0 @@ -package main - -type ColumnType struct { - name string - - hasMaxLength bool - hasPrecisionScale bool - - userType string - systemType string - unifiedType string - nullable bool - maxLength int64 - precision int64 - scale int64 -} - -func (c *ColumnType) Name() string { - return c.name -} - -func (c *ColumnType) UserType() string { - return c.userType -} - -func (c *ColumnType) SystemType() string { - return c.systemType -} - -func (c *ColumnType) Length() (length int64, ok bool) { - return c.maxLength, c.hasMaxLength -} - -func (c *ColumnType) DecimalSize() (precision, scale int64, ok bool) { - return c.precision, c.scale, c.hasPrecisionScale -} - -func (c *ColumnType) Nullable() bool { - return c.nullable -} - -func (c *ColumnType) Type() string { - return c.unifiedType -} diff --git a/cmd/go_migrate/inspect-columns.go b/cmd/go_migrate/inspect-columns.go deleted file mode 100644 index 6da878c..0000000 --- a/cmd/go_migrate/inspect-columns.go +++ /dev/null @@ -1,316 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "errors" - "fmt" - "strings" - "sync" - "time" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/jackc/pgx/v5/pgxpool" - _ "github.com/microsoft/go-mssqldb" - log "github.com/sirupsen/logrus" -) - -func GetUnifiedType(systemType string) string { - systemType = strings.ToLower(systemType) - - if systemType == "varchar" || systemType == "char" || systemType == "nvarchar" || systemType == "nchar" || systemType == "text" || systemType == "ntext" { - return "STRING" - } - - if systemType == "int" || systemType == "int4" || systemType == "integer" || systemType == "smallint" || systemType == "int2" || systemType == "bigint" || systemType == "int8" || systemType == "tinyint" { - return "INTEGER" - } - - if systemType == "decimal" || systemType == "numeric" { - return "DECIMAL" - } - - if systemType == "float" || systemType == "real" || systemType == "double precision" { - return "FLOAT" - } - - if systemType == "bit" || systemType == "boolean" { - return "BOOLEAN" - } - - if systemType == "date" { - return "DATE" - } - if systemType == "time" || systemType == "time without time zone" { - return "TIME" - } - if systemType == "datetime" || systemType == "datetime2" || systemType == "timestamp" || systemType == "timestamptz" || systemType == "timestamp with time zone" { - return "TIMESTAMP" - } - - if systemType == "binary" || systemType == "varbinary" || systemType == "image" || systemType == "bytea" { - return "BINARY" - } - - if systemType == "uniqueidentifier" || systemType == "uuid" { - return "UUID" - } - - if systemType == "json" { - return "JSON" - } - - if systemType == "geometry" || systemType == "geography" { - return "GEOMETRY" - } - - return strings.ToUpper(systemType) -} - -func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, scale *int64) models.ColumnType { - stringTypes := map[string]bool{ - "varchar": true, "char": true, "character": true, "text": true, "character varying": true, - } - - decimalTypes := map[string]bool{ - "decimal": true, "numeric": true, - } - - if stringTypes[column.systemType] { - if maxLength != nil { - column.maxLength = *maxLength - column.hasMaxLength = true - } else { - column.maxLength = -1 - column.hasMaxLength = false - } - column.hasPrecisionScale = false - column.precision = -1 - column.scale = -1 - } else if decimalTypes[column.systemType] { - column.hasMaxLength = false - column.maxLength = -1 - if precision != nil && scale != nil { - column.precision = *precision - column.scale = *scale - column.hasPrecisionScale = true - } else { - column.precision = -1 - column.scale = -1 - column.hasPrecisionScale = false - } - } else { - column.hasMaxLength = false - column.maxLength = -1 - column.hasPrecisionScale = false - column.precision = -1 - column.scale = -1 - } - - column.unifiedType = GetUnifiedType(column.systemType) - - colType := models.NewColumnType( - column.name, - column.hasMaxLength, - column.hasPrecisionScale, - column.userType, - column.systemType, - column.unifiedType, - column.nullable, - column.maxLength, - column.precision, - column.scale, - ) - - return colType -} - -func GetColumnTypesPostgres(db *pgxpool.Pool, tableInfo config.TargetTableInfo) ([]models.ColumnType, error) { - query := ` -SELECT - c.column_name AS name, - c.data_type AS user_type, - c.udt_name AS system_type, - (CASE WHEN c.is_nullable = 'YES' THEN TRUE ELSE FALSE END) AS nullable, - c.character_maximum_length AS max_length, - c.numeric_precision AS precision, - c.numeric_scale AS scale -FROM information_schema.columns c -WHERE c.table_schema = $1 AND c.table_name = $2 -ORDER BY c.ordinal_position; -` - - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - rows, err := db.Query(ctx, query, tableInfo.Schema, tableInfo.Table) - if err != nil { - return nil, fmt.Errorf("Error querying column types: %w", err) - } - defer rows.Close() - - var colTypes []models.ColumnType - - for rows.Next() { - var column ColumnType - var scanMaxLength *int64 - var scanPrecision *int64 - var scanScale *int64 - - if err := rows.Scan( - &column.name, - &column.userType, - &column.systemType, - &column.nullable, - &scanMaxLength, - &scanPrecision, - &scanScale, - ); err != nil { - return nil, fmt.Errorf("Error scanning column type results: %w", err) - } - - colTypes = append(colTypes, MapPostgresColumn(column, scanMaxLength, scanPrecision, scanScale)) - } - - return colTypes, nil -} - -func MapMssqlColumn(column ColumnType) models.ColumnType { - stringTypes := map[string]bool{ - "varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true, - } - - decimalTypes := map[string]bool{ - "decimal": true, "numeric": true, - } - - if stringTypes[column.systemType] { - column.hasMaxLength = true - if column.systemType == "nvarchar" || column.systemType == "nchar" { - if column.maxLength > 0 { - column.maxLength = column.maxLength / 2 - } - } - column.hasPrecisionScale = false - column.precision = -1 - column.scale = -1 - } else if decimalTypes[column.systemType] { - column.hasMaxLength = false - column.maxLength = -1 - column.hasPrecisionScale = true - } else { - column.hasMaxLength = false - column.maxLength = -1 - column.hasPrecisionScale = false - column.precision = -1 - column.scale = -1 - } - - column.unifiedType = GetUnifiedType(column.systemType) - - colType := models.NewColumnType( - column.name, - column.hasMaxLength, - column.hasPrecisionScale, - column.userType, - column.systemType, - column.unifiedType, - column.nullable, - column.maxLength, - column.precision, - column.scale, - ) - - return colType -} - -func GetColumnTypesMssql(db *sql.DB, tableInfo config.SourceTableInfo) ([]models.ColumnType, error) { - query := ` -SELECT - c.name AS name, - t.name AS user_type, - CASE WHEN t.is_user_defined = 0 THEN t.name ELSE bt.name END AS system_type, - c.is_nullable AS nullable, - c.max_length AS max_length, - c.precision AS precision, - c.scale AS scale -FROM sys.columns c -JOIN sys.types t ON c.user_type_id = t.user_type_id -LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id -JOIN sys.tables st ON c.object_id = st.object_id -JOIN sys.schemas s ON st.schema_id = s.schema_id -WHERE s.name = @schema AND st.name = @table -ORDER BY c.column_id; -` - - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - rows, err := db.QueryContext(ctx, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)) - if err != nil { - return nil, fmt.Errorf("Error querying column types: %w", err) - } - defer rows.Close() - - var colTypes []models.ColumnType - - for rows.Next() { - var column ColumnType - - if err := rows.Scan( - &column.name, - &column.userType, - &column.systemType, - &column.nullable, - &column.maxLength, - &column.precision, - &column.scale, - ); err != nil { - return nil, fmt.Errorf("Error scanning column type results: %W", err) - } - - if strings.HasPrefix(column.name, "graph_id") && column.systemType == "bigint" { - continue - } - - colTypes = append(colTypes, MapMssqlColumn(column)) - } - - return colTypes, nil -} - -func GetColumnTypes( - sourceDb *sql.DB, - targetDb *pgxpool.Pool, - sourceTable config.SourceTableInfo, - targetTable config.TargetTableInfo, -) ([]models.ColumnType, []models.ColumnType, error) { - var sourceDbErr error - var targetDbErr error - var sourceColTypes []models.ColumnType - var targetColTypes []models.ColumnType - var wg sync.WaitGroup - - wg.Go(func() { - sourceColTypes, sourceDbErr = GetColumnTypesMssql(sourceDb, sourceTable) - if sourceDbErr != nil { - log.Error("Error (sourceDb): ", sourceDbErr) - } - }) - - wg.Go(func() { - targetColTypes, targetDbErr = GetColumnTypesPostgres(targetDb, targetTable) - if targetDbErr != nil { - log.Error("Error (targetDb): ", targetDbErr) - } - }) - - wg.Wait() - - if sourceDbErr != nil || targetDbErr != nil { - return nil, nil, errors.New("Error querying column types") - } - - return sourceColTypes, targetColTypes, nil -} diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 4253dd6..5e399fd 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -91,7 +91,8 @@ func processMigrationJobs( chJobs := make(chan config.Job, len(jobs)) var wgJobs sync.WaitGroup - tableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) + sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) + targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) extractor := extractors.NewMssqlExtractor(sourceDb) transformer := transformers.NewMssqlTransformer() loader := loaders.NewPostgresLoader(targetDb) @@ -102,9 +103,8 @@ func processMigrationJobs( log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) res := processMigrationJob( ctx, - sourceDb, - targetDb, - tableAnalyzer, + sourceTableAnalyzer, + targetTableAnalyzer, extractor, transformer, loader, diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 3a8b27a..9a2fac3 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -2,7 +2,6 @@ package main import ( "context" - "database/sql" "sync" "sync/atomic" "time" @@ -12,21 +11,22 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/jackc/pgx/v5/pgxpool" - _ "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) func processMigrationJob( ctx context.Context, - sourceDb *sql.DB, - targetDb *pgxpool.Pool, - tableAnalyzer etl.TableAnalyzer, + sourceTableAnalyzer etl.TableAnalyzer, + targetTableAnalyzer etl.TableAnalyzer, extractor etl.Extractor, transformer etl.Transformer, loader etl.Loader, job config.Job, ) JobResult { + jobCtx, cancel := context.WithCancel(ctx) + defer cancel() + result := JobResult{ JobName: job.Name, StartTime: time.Now(), @@ -34,21 +34,38 @@ func processMigrationJob( var rowsRead, rowsLoaded, rowsFailed int64 - sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable) + var wgQueryColumnTypes errgroup.Group + var sourceColTypes, targetColTypes []models.ColumnType + + wgQueryColumnTypes.Go(func() error { + var err error + sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(jobCtx, job.SourceTable.TableInfo) + if err != nil { + return err + } + + return nil + }) + + wgQueryColumnTypes.Go(func() error { + var err error + targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(jobCtx, job.TargetTable.TableInfo) + if err != nil { + return err + } + + return nil + }) + + err := wgQueryColumnTypes.Wait() if err != nil { result.Error = err return result } - logColumnTypes(sourceColTypes, "Source col types") - logColumnTypes(targetColTypes, "Target col types") - - jobCtx, cancel := context.WithCancel(ctx) - defer cancel() - partitions, err := table_analyzers.PartitionRangeGenerator( jobCtx, - tableAnalyzer, + sourceTableAnalyzer, job.SourceTable.TableInfo, job.SourceTable.PrimaryKey, job.RowsPerPartition, @@ -72,6 +89,7 @@ func processMigrationJob( go func() { if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil { + log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err) cancel() result.Error = err } @@ -140,25 +158,39 @@ func processMigrationJob( } go func() { + log.Debugf("Waiting for goroutines (%v)", job.Name) + wgActivePartitions.Wait() + log.Debugf("wgActivePartitions is empty (%v)", job.Name) close(chPartitions) + log.Debugf("chPartitions is closed (%v)", job.Name) close(chExtractorErrors) + log.Debugf("chExtractorErrors is closed (%v)", job.Name) wgExtractors.Wait() + log.Debugf("wgExtractors is empty (%v)", job.Name) close(chBatchesRaw) + log.Debugf("chBatchesRaw is closed (%v)", job.Name) wgTransformers.Wait() + log.Debugf("wgTransformers is empty (%v)", job.Name) wgActiveBatches.Wait() + log.Debugf("wgActiveBatches is empty (%v)", job.Name) close(chBatchesTransformed) + log.Debugf("chBatchesTransformed is empty (%v)", job.Name) close(chLoadersErrors) + log.Debugf("chLoadersErrors is empty (%v)", job.Name) wgLoaders.Wait() + log.Debugf("wgLoaders is empty (%v)", job.Name) cancel() }() + log.Debugf("waiting for local context to be done (%v)", job.Name) <-jobCtx.Done() + log.Debugf("local context done (%v)", job.Name) if ctx.Err() != nil { result.Error = ctx.Err() @@ -171,11 +203,3 @@ func processMigrationJob( return result } - -func logColumnTypes(columnTypes []models.ColumnType, label string) { - log.Debug(label) - - for _, col := range columnTypes { - log.Debugf("%+v", col) - } -} diff --git a/go.mod b/go.mod index af6b560..96170be 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/microsoft/go-mssqldb v1.9.8 github.com/sirupsen/logrus v1.9.4 github.com/twpayne/go-geom v1.6.1 + golang.org/x/sync v0.19.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -23,7 +24,6 @@ require ( github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/shopspring/decimal v1.4.0 // indirect golang.org/x/crypto v0.48.0 // indirect - golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.34.0 // indirect ) diff --git a/internal/app/etl/table_analyzers/mssql.go b/internal/app/etl/table_analyzers/mssql.go index 2981b11..cf8cd57 100644 --- a/internal/app/etl/table_analyzers/mssql.go +++ b/internal/app/etl/table_analyzers/mssql.go @@ -36,6 +36,7 @@ LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_t JOIN sys.tables st ON c.object_id = st.object_id JOIN sys.schemas s ON st.schema_id = s.schema_id WHERE s.name = @schema AND st.name = @table + AND c.name NOT LIKE 'graph_id%' ORDER BY c.column_id;` type rawColumnMssql struct {