diff --git a/cmd/go_migrate/connect.go b/cmd/go_migrate/connect.go index e9e8f23..eb31411 100644 --- a/cmd/go_migrate/connect.go +++ b/cmd/go_migrate/connect.go @@ -46,29 +46,31 @@ func connectToPostgres() (*pgxpool.Pool, error) { return pool, nil } -func connectToDatabases(sourceDb *sql.DB, targetDb *pgxpool.Pool) error { +func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) { var sourceDbErr, targetDbErr error + var sourceDb *sql.DB + var targetDb *pgxpool.Pool var wg sync.WaitGroup - + wg.Go(func() { - sourceDb, sourceDbErr = connectToSqlServer() - if sourceDbErr != nil { - log.Error("Unable to connect to source db: ", sourceDbErr) - } + sourceDb, sourceDbErr = connectToSqlServer() + if sourceDbErr != nil { + log.Error("Unable to connect to source db: ", sourceDbErr) + } }) wg.Go(func() { - targetDb, targetDbErr = connectToPostgres() - if targetDbErr != nil { - log.Error("Unable to connect to target db: ", targetDbErr) - } + targetDb, targetDbErr = connectToPostgres() + if targetDbErr != nil { + log.Error("Unable to connect to target db: ", targetDbErr) + } }) wg.Wait() if sourceDbErr != nil || targetDbErr != nil { - return fmt.Errorf("Unable to connect to databases: %w (source), %w (target)", sourceDbErr, targetDbErr) + return nil, nil, fmt.Errorf("Unable to connect to databases: %w (source), %w (target)", sourceDbErr, targetDbErr) } - return nil + return sourceDb, targetDb, nil } diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 8154363..c4734a6 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -1,7 +1,14 @@ package main import ( + "context" "database/sql" + "fmt" + + "sync" + "time" + + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" @@ -22,13 +29,97 @@ var migrationJobs []MigrationJob = []MigrationJob{ func main() { configureLog() log.Info("Starting migration...") - - var sourceDb *sql.DB - var targetDb *pgxpool.Pool - - connectToDatabases(sourceDb, targetDb) - log.Debugf("Migration jobs: %+v", migrationJobs) + sourceDb, targetDb, connError := connectToDatabases() + if connError != nil { + log.Fatal("Connection error: ", connError) + } + + defer sourceDb.Close() + defer targetDb.Close() + + for _, job := range migrationJobs { + sourceColTypes, targetColTypes, err := queryColumnTypes(sourceDb, targetDb, job) + if err != nil { + log.Fatal("Unexpected error: ", err) + } + + log.Debugf("Source col types: %+v", sourceColTypes) + log.Debugf("Target col types: %+v", targetColTypes) + } + log.Info("Migration completed successfully!") } + +func querySourceColTypes(db *sql.DB, migrationJob MigrationJob) ([]sql.ColumnType, error) { + query := fmt.Sprintf(`SELECT * FROM [%s].[%s] WHERE 0 = 1`, migrationJob.Schema, migrationJob.Table) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + rows, err := db.QueryContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("Error querying column types: %w", err) + } + defer rows.Close() + + colTypesPointers, err := rows.ColumnTypes() + if err != nil { + return nil, err + } + + colTypes := make([]sql.ColumnType, 0, len(colTypesPointers)) + for _, c := range colTypesPointers { + colTypes = append(colTypes, *c) + } + + return colTypes, nil +} + +func queryTargetColTypes(db *pgxpool.Pool, migrationJob MigrationJob) ([]pgconn.FieldDescription, error) { + query := fmt.Sprintf(`SELECT * FROM "%s"."%s" WHERE 0 = 1`, migrationJob.Schema, migrationJob.Table) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + rows, err := db.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("Error querying column types: %w", err) + } + defer rows.Close() + + colTypes := rows.FieldDescriptions() + + return colTypes, nil +} + +func queryColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob MigrationJob) ([]sql.ColumnType, []pgconn.FieldDescription, error) { + var sourceDbErr error + var targetDbErr error + var sourceColTypes []sql.ColumnType + var targetColTypes []pgconn.FieldDescription + var wg sync.WaitGroup + + wg.Go(func() { + sourceColTypes, sourceDbErr = querySourceColTypes(sourceDb, migrationJob) + if sourceDbErr != nil { + log.Error("Error (sourceDb): ", sourceDbErr) + } + }) + + wg.Go(func() { + targetColTypes, targetDbErr = queryTargetColTypes(targetDb, migrationJob) + if targetDbErr != nil { + log.Error("Error (targetDb): ", targetDbErr) + } + }) + + wg.Wait() + + if sourceDbErr != nil || targetDbErr != nil { + return nil, nil, fmt.Errorf("Unable to connect to databases: %w (source), %w (target)", sourceDbErr, targetDbErr) + } + + return sourceColTypes, targetColTypes, nil +}