Files
go-migrate/cmd/go_migrate/main.go

126 lines
3.0 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
type MigrationJob struct {
Schema string
Table string
}
var migrationJobs []MigrationJob = []MigrationJob{
{
Schema: "Cartografia",
Table: "MANZANA",
},
}
func main() {
configureLog()
log.Info("Starting migration...")
log.Debugf("Migration jobs: %+v", migrationJobs)
sourceDb, targetDb, connError := connectToDatabases()
if connError != nil {
log.Fatal("Connection error: ", connError)
}
defer sourceDb.Close()
defer targetDb.Close()
for _, job := range migrationJobs {
sourceColTypes, targetColTypes, err := queryColumnTypes(sourceDb, targetDb, job)
if err != nil {
log.Fatal("Unexpected error: ", err)
}
log.Debugf("Source col types: %+v", sourceColTypes)
log.Debugf("Target col types: %+v", targetColTypes)
}
log.Info("Migration completed successfully!")
}
func querySourceColTypes(db *sql.DB, migrationJob MigrationJob) ([]sql.ColumnType, error) {
query := fmt.Sprintf(`SELECT * FROM [%s].[%s] WHERE 0 = 1`, migrationJob.Schema, migrationJob.Table)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("Error querying column types: %w", err)
}
defer rows.Close()
colTypesPointers, err := rows.ColumnTypes()
if err != nil {
return nil, err
}
colTypes := make([]sql.ColumnType, 0, len(colTypesPointers))
for _, c := range colTypesPointers {
colTypes = append(colTypes, *c)
}
return colTypes, nil
}
func queryTargetColTypes(db *pgxpool.Pool, migrationJob MigrationJob) ([]pgconn.FieldDescription, error) {
query := fmt.Sprintf(`SELECT * FROM "%s"."%s" WHERE 0 = 1`, migrationJob.Schema, migrationJob.Table)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
rows, err := db.Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("Error querying column types: %w", err)
}
defer rows.Close()
colTypes := rows.FieldDescriptions()
return colTypes, nil
}
func queryColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob MigrationJob) ([]sql.ColumnType, []pgconn.FieldDescription, error) {
var sourceDbErr error
var targetDbErr error
var sourceColTypes []sql.ColumnType
var targetColTypes []pgconn.FieldDescription
var wg sync.WaitGroup
wg.Go(func() {
sourceColTypes, sourceDbErr = querySourceColTypes(sourceDb, migrationJob)
if sourceDbErr != nil {
log.Error("Error (sourceDb): ", sourceDbErr)
}
})
wg.Go(func() {
targetColTypes, targetDbErr = queryTargetColTypes(targetDb, migrationJob)
if targetDbErr != nil {
log.Error("Error (targetDb): ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, fmt.Errorf("Unable to connect to databases: %w (source), %w (target)", sourceDbErr, targetDbErr)
}
return sourceColTypes, targetColTypes, nil
}