feat: refactor database connection logic and add column type querying functions
This commit is contained in:
@@ -46,29 +46,31 @@ func connectToPostgres() (*pgxpool.Pool, error) {
|
|||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func connectToDatabases(sourceDb *sql.DB, targetDb *pgxpool.Pool) error {
|
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
|
||||||
var sourceDbErr, targetDbErr error
|
var sourceDbErr, targetDbErr error
|
||||||
|
var sourceDb *sql.DB
|
||||||
|
var targetDb *pgxpool.Pool
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
sourceDb, sourceDbErr = connectToSqlServer()
|
sourceDb, sourceDbErr = connectToSqlServer()
|
||||||
if sourceDbErr != nil {
|
if sourceDbErr != nil {
|
||||||
log.Error("Unable to connect to source db: ", sourceDbErr)
|
log.Error("Unable to connect to source db: ", sourceDbErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
targetDb, targetDbErr = connectToPostgres()
|
targetDb, targetDbErr = connectToPostgres()
|
||||||
if targetDbErr != nil {
|
if targetDbErr != nil {
|
||||||
log.Error("Unable to connect to target db: ", targetDbErr)
|
log.Error("Unable to connect to target db: ", targetDbErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if sourceDbErr != nil || targetDbErr != nil {
|
if sourceDbErr != nil || targetDbErr != nil {
|
||||||
return fmt.Errorf("Unable to connect to databases: %w (source), %w (target)", sourceDbErr, targetDbErr)
|
return nil, nil, fmt.Errorf("Unable to connect to databases: %w (source), %w (target)", sourceDbErr, targetDbErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return sourceDb, targetDb, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,14 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
_ "github.com/microsoft/go-mssqldb"
|
_ "github.com/microsoft/go-mssqldb"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -22,13 +29,97 @@ var migrationJobs []MigrationJob = []MigrationJob{
|
|||||||
func main() {
|
func main() {
|
||||||
configureLog()
|
configureLog()
|
||||||
log.Info("Starting migration...")
|
log.Info("Starting migration...")
|
||||||
|
|
||||||
var sourceDb *sql.DB
|
|
||||||
var targetDb *pgxpool.Pool
|
|
||||||
|
|
||||||
connectToDatabases(sourceDb, targetDb)
|
|
||||||
|
|
||||||
log.Debugf("Migration jobs: %+v", migrationJobs)
|
log.Debugf("Migration jobs: %+v", migrationJobs)
|
||||||
|
|
||||||
|
sourceDb, targetDb, connError := connectToDatabases()
|
||||||
|
if connError != nil {
|
||||||
|
log.Fatal("Connection error: ", connError)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer sourceDb.Close()
|
||||||
|
defer targetDb.Close()
|
||||||
|
|
||||||
|
for _, job := range migrationJobs {
|
||||||
|
sourceColTypes, targetColTypes, err := queryColumnTypes(sourceDb, targetDb, job)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Unexpected error: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Source col types: %+v", sourceColTypes)
|
||||||
|
log.Debugf("Target col types: %+v", targetColTypes)
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("Migration completed successfully!")
|
log.Info("Migration completed successfully!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func querySourceColTypes(db *sql.DB, migrationJob MigrationJob) ([]sql.ColumnType, error) {
|
||||||
|
query := fmt.Sprintf(`SELECT * FROM [%s].[%s] WHERE 0 = 1`, migrationJob.Schema, migrationJob.Table)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
rows, err := db.QueryContext(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Error querying column types: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
colTypesPointers, err := rows.ColumnTypes()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
colTypes := make([]sql.ColumnType, 0, len(colTypesPointers))
|
||||||
|
for _, c := range colTypesPointers {
|
||||||
|
colTypes = append(colTypes, *c)
|
||||||
|
}
|
||||||
|
|
||||||
|
return colTypes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func queryTargetColTypes(db *pgxpool.Pool, migrationJob MigrationJob) ([]pgconn.FieldDescription, error) {
|
||||||
|
query := fmt.Sprintf(`SELECT * FROM "%s"."%s" WHERE 0 = 1`, migrationJob.Schema, migrationJob.Table)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
rows, err := db.Query(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Error querying column types: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
colTypes := rows.FieldDescriptions()
|
||||||
|
|
||||||
|
return colTypes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func queryColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob MigrationJob) ([]sql.ColumnType, []pgconn.FieldDescription, error) {
|
||||||
|
var sourceDbErr error
|
||||||
|
var targetDbErr error
|
||||||
|
var sourceColTypes []sql.ColumnType
|
||||||
|
var targetColTypes []pgconn.FieldDescription
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wg.Go(func() {
|
||||||
|
sourceColTypes, sourceDbErr = querySourceColTypes(sourceDb, migrationJob)
|
||||||
|
if sourceDbErr != nil {
|
||||||
|
log.Error("Error (sourceDb): ", sourceDbErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
wg.Go(func() {
|
||||||
|
targetColTypes, targetDbErr = queryTargetColTypes(targetDb, migrationJob)
|
||||||
|
if targetDbErr != nil {
|
||||||
|
log.Error("Error (targetDb): ", targetDbErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if sourceDbErr != nil || targetDbErr != nil {
|
||||||
|
return nil, nil, fmt.Errorf("Unable to connect to databases: %w (source), %w (target)", sourceDbErr, targetDbErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sourceColTypes, targetColTypes, nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user