9 Commits

20 changed files with 476 additions and 173 deletions

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
bin/
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test

80
Makefile Normal file
View File

@@ -0,0 +1,80 @@
.PHONY: build build-linux build-windows build-all clean help
# Variables
BINARY_NAME=go-migrate
CMD_PATH=./cmd/go_migrate
OUTPUT_DIR=bin
VERSION?=$(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
GIT_COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Flags de compilación
LD_FLAGS=-ldflags="-s -w -X main.Version=$(VERSION) -X main.BuildTime=$(BUILD_TIME) -X main.GitCommit=$(GIT_COMMIT)"
# Default: compilar para el SO actual
build: build-$(OS)
ifeq ($(OS),Windows_NT)
build-native: build-windows
else
build-native: build-linux
endif
# Compilar para Linux (sin CGO para máxima compatibilidad)
build-linux:
@echo "Compilando para Linux..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64"
# Compilar para Windows
build-windows:
@echo "Compilando para Windows..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe"
# Compilar para ambas plataformas
build-all: build-linux build-windows
@echo ""
@echo "Binarios compilados:"
@ls -lh $(OUTPUT_DIR)/$(BINARY_NAME)*
# Compilar para Linux arm64 (opcional, para Raspberry Pi, etc.)
build-linux-arm64:
@echo "Compilando para Linux ARM64..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64"
# Limpiar binarios
clean:
@echo "Limpiando binarios..."
@rm -rf $(OUTPUT_DIR)
@echo "Limpieza completada"
# Ayuda
help:
@echo "Comandos disponibles:"
@echo ""
@echo " make build - Compilar para el SO actual (Linux/Windows)"
@echo " make build-linux - Compilar para Linux x86_64"
@echo " make build-windows - Compilar para Windows x86_64"
@echo " make build-linux-arm64 - Compilar para Linux ARM64 (opcional)"
@echo " make build-all - Compilar para Linux y Windows"
@echo " make clean - Eliminar binarios compilados"
@echo " make help - Mostrar esta ayuda"
@echo ""
@echo "Ejemplos de uso:"
@echo " make build-all # Crear binarios para ambas plataformas"
@echo " make build-linux OS= # Crear solo para Linux"
@echo ""

View File

@@ -2,18 +2,17 @@ package main
import ( import (
"context" "context"
"database/sql"
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
) )
func main() { func main() {
@@ -33,11 +32,33 @@ func main() {
log.Info("=== Starting migration ===") log.Info("=== Starting migration ===")
sourceDb, targetDb, connError := connectToDatabases() var wgConnect errgroup.Group
if connError != nil { var sourceDb, targetDb dbwrapper.DbWrapper
log.Fatal("Connection error: ", connError)
}
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
if err := wgConnect.Wait(); err != nil {
log.Error("Connection error: ", err)
return
}
defer sourceDb.Close() defer sourceDb.Close()
defer targetDb.Close() defer targetDb.Close()
@@ -70,8 +91,8 @@ func main() {
func processMigrationJobs( func processMigrationJobs(
ctx context.Context, ctx context.Context,
sourceDb *sql.DB, sourceDb dbwrapper.DbWrapper,
targetDb *pgxpool.Pool, targetDb dbwrapper.DbWrapper,
jobs []config.Job, jobs []config.Job,
maxParallelWorkers int, maxParallelWorkers int,
) []JobResult { ) []JobResult {
@@ -94,7 +115,6 @@ func processMigrationJobs(
chJobs := make(chan config.Job, len(jobs)) chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup var wgJobs sync.WaitGroup
targetDbWrapper := db.NewPostgresDbWrapper(targetDb)
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
@@ -107,7 +127,7 @@ func processMigrationJobs(
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob( res := processMigrationJob(
ctx, ctx,
targetDbWrapper, targetDb,
sourceTableAnalyzer, sourceTableAnalyzer,
targetTableAnalyzer, targetTableAnalyzer,
extractor, extractor,
@@ -138,3 +158,19 @@ func processMigrationJobs(
return finalResults return finalResults
} }
func connectWithTimeout(ctx context.Context, dbType string, dbUrl string, timeout time.Duration) (dbwrapper.DbWrapper, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sourceDb, err := dbwrapper.New(dbType)
if err != nil {
return nil, err
}
if err = sourceDb.Connect(localCtx, dbUrl); err != nil {
return nil, err
}
return sourceDb, nil
}

View File

@@ -2,13 +2,14 @@ package main
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
@@ -18,8 +19,7 @@ import (
func processMigrationJob( func processMigrationJob(
ctx context.Context, ctx context.Context,
// sourceDbWrapper db.DbWrapper, targetDbWrapper dbwrapper.DbWrapper,
targetDbWrapper db.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer, sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor, extractor etl.Extractor,
@@ -234,5 +234,9 @@ func processMigrationJob(
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded) result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed) result.RowsFailed = atomic.LoadInt64(&rowsFailed)
if result.RowsRead != result.RowsLoaded {
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
return result return result
} }

View File

@@ -30,6 +30,7 @@ jobs:
table: MANZANA table: MANZANA
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
# - 'TRUNCATE TABLE "Cartografia"."MANZANA"'
range: range:
min: 1000000 min: 1000000
max: 2000000 max: 2000000
@@ -47,5 +48,6 @@ jobs:
table: PUERTO table: PUERTO
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
# - 'TRUNCATE TABLE "Red"."PUERTO"'
post_sql: post_sql:
- "SELECT 1" - "SELECT 1"

View File

@@ -82,6 +82,8 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.MaxParallelWorkers = raw.MaxParallelWorkers c.MaxParallelWorkers = raw.MaxParallelWorkers
c.Defaults = raw.Defaults c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition) c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs { for _, node := range raw.Jobs {

View File

@@ -2,13 +2,18 @@ package dbwrapper
import "fmt" import "fmt"
func NewWrapper(driverType string) (DbWrapper, error) { type Factory func() DbWrapper
switch driverType {
case "postgres": var drivers = make(map[string]Factory)
return &postgresDbWrapper{}, nil
case "sqlserver": func Register(name string, factory Factory) {
return &mssqlDbWrapper{}, nil drivers[name] = factory
default: }
func New(driverType string) (DbWrapper, error) {
factory, ok := drivers[driverType]
if !ok {
return nil, fmt.Errorf("driver not yet supported: %s", driverType) return nil, fmt.Errorf("driver not yet supported: %s", driverType)
} }
return factory(), nil
} }

View File

@@ -3,18 +3,101 @@ package dbwrapper
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
mssql "github.com/microsoft/go-mssqldb"
) )
type mssqlDbWrapper struct { func init() {
db *sql.DB Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
})
} }
func (wrapper *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error { return nil } type mssqlRowResult struct {
row *sql.Row
}
func (wrapper *mssqlDbWrapper) Close() error { return nil } func (mr *mssqlRowResult) Scan(dest ...any) error {
return mr.row.Scan(dest...)
}
func (wrapper *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) { type mssqlRowsResult struct {
result, execErr := wrapper.db.ExecContext(ctx, query, args...) columns []string
rows *sql.Rows
}
func (mr *mssqlRowsResult) Close() error {
return mr.rows.Close()
}
func (mr *mssqlRowsResult) Columns() ([]string, error) {
if mr.columns != nil {
return mr.columns, nil
}
return mr.rows.Columns()
}
func (mr *mssqlRowsResult) Err() error {
return mr.rows.Err()
}
func (mr *mssqlRowsResult) Next() bool {
return mr.rows.Next()
}
func (mr *mssqlRowsResult) Scan(dest ...any) error {
return mr.rows.Scan(dest...)
}
func (mr *mssqlRowsResult) Values() ([]any, error) {
columns, err := mr.Columns()
if err != nil {
return nil, err
}
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := mr.rows.Scan(scanArgs...); err != nil {
return nil, err
}
return rowValues, nil
}
type mssqlDbWrapper struct {
db *sql.DB
dialect string
}
func (mw *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error {
db, err := sql.Open("sqlserver", dbUrl)
if err != nil {
return err
}
if err := db.PingContext(ctx); err != nil {
if err := db.Close(); err != nil {
return err
}
return err
}
mw.db = db
return nil
}
func (mw *mssqlDbWrapper) Close() error {
return mw.db.Close()
}
func (mw *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, execErr := mw.db.ExecContext(ctx, query, args...)
if execErr != nil { if execErr != nil {
return ExecResult{}, execErr return ExecResult{}, execErr
} }
@@ -24,15 +107,70 @@ func (wrapper *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...a
return ExecResult{}, err return ExecResult{}, err
} }
return ExecResult{ return ExecResult{AffectedRows: affectedRows}, nil
AffectedRows: affectedRows,
}, nil
} }
func (wrapper *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) { func (mw *mssqlDbWrapper) GetDialect() string {
return nil, nil return mw.dialect
} }
func (wrapper *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) { func (mw *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
return 0, nil rows, err := mw.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &mssqlRowsResult{columns: nil, rows: rows}, nil
}
func (mw *mssqlDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := mw.db.QueryRowContext(ctx, query, args...)
return &mssqlRowResult{row: row}
}
func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
tx, err := mw.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
fullTableName := fmt.Sprintf("[%s].[%s]", schema, table)
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, columnNames...))
if err != nil {
tx.Rollback()
return 0, err
}
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
if err := stmt.Close(); err != nil {
tx.Rollback()
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
rowsAffected, raErr := result.RowsAffected()
if raErr != nil {
return 0, nil
}
return rowsAffected, nil
} }

View File

@@ -2,33 +2,127 @@ package dbwrapper
import ( import (
"context" "context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
) )
type postgresDbWrapper struct { func init() {
db *pgxpool.Pool Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
})
} }
func (wrapper *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error { return nil } type postgresRowResult struct {
row pgx.Row
}
func (wrapper *postgresDbWrapper) Close() error { return nil } func (pr *postgresRowResult) Scan(dest ...any) error {
return pr.row.Scan(dest...)
}
func (wrapper *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) { type postgresRowsResult struct {
result, err := wrapper.db.Exec(ctx, query, args...) columns []string
rows pgx.Rows
}
func (pr *postgresRowsResult) Close() error {
pr.rows.Close()
return nil
}
func (pr *postgresRowsResult) Columns() ([]string, error) {
if pr.columns != nil {
return pr.columns, nil
}
rawColumns := pr.rows.FieldDescriptions()
if rawColumns == nil {
return nil, errors.New("error retrieving columns")
}
columns := make([]string, 0, len(rawColumns))
for _, rc := range rawColumns {
columns = append(columns, rc.Name)
}
return columns, nil
}
func (pr *postgresRowsResult) Err() error {
return pr.rows.Err()
}
func (pr *postgresRowsResult) Next() bool {
return pr.rows.Next()
}
func (pr *postgresRowsResult) Scan(dest ...any) error {
return pr.rows.Scan(dest...)
}
func (pr *postgresRowsResult) Values() ([]any, error) {
return pr.rows.Values()
}
type postgresDbWrapper struct {
db *pgxpool.Pool
dialect string
}
func (pw *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error {
pool, err := pgxpool.New(ctx, dbUrl)
if err != nil {
return err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return err
}
pw.db = pool
return nil
}
func (pw *postgresDbWrapper) Close() error {
pw.db.Close()
return nil
}
func (pw *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, err := pw.db.Exec(ctx, query, args...)
if err != nil { if err != nil {
return ExecResult{}, err return ExecResult{}, err
} }
return ExecResult{ return ExecResult{AffectedRows: result.RowsAffected()}, nil
AffectedRows: result.RowsAffected(),
}, nil
} }
func (wrapper *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) { func (pw *postgresDbWrapper) GetDialect() string {
return nil, nil return pw.dialect
} }
func (wrapper *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) { func (pw *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
return 0, nil rows, err := pw.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRowsResult{columns: nil, rows: rows}, nil
}
func (pw *postgresDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := pw.db.QueryRow(ctx, query, args...)
return &postgresRowResult{row: row}
}
func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
affectedRows, err := pw.db.CopyFrom(ctx, pgx.Identifier{schema, table}, columnNames, pgx.CopyFromRows(rows))
if err != nil {
return 0, err
}
return affectedRows, nil
} }

View File

@@ -2,24 +2,34 @@ package dbwrapper
import ( import (
"context" "context"
"errors"
) )
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
type ExecResult struct { type ExecResult struct {
AffectedRows int64 AffectedRows int64
} }
type RowsResult interface { type RowsResult interface {
Close() Close() error
Columns() ([]string, error)
Err() error Err() error
Next() bool Next() bool
Scan(dest ...any) error
Values() ([]any, error) Values() ([]any, error)
Columns() ([]string, error) }
type RowResult interface {
Scan(dest ...any) error
} }
type DbWrapper interface { type DbWrapper interface {
Connect(ctx context.Context, dbUrl string) error
Close() error Close() error
Connect(ctx context.Context, dbUrl string) error
Exec(ctx context.Context, query string, args ...any) (ExecResult, error) Exec(ctx context.Context, query string, args ...any) (ExecResult, error)
GetDialect() string
Query(ctx context.Context, query string, args ...any) (RowsResult, error) Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
} }

View File

@@ -1,30 +0,0 @@
package db
import (
"context"
"database/sql"
)
type MssqlDbWrapper struct {
db *sql.DB
}
func NewMssqlDbWrapper(db *sql.DB) DbWrapper {
return &MssqlDbWrapper{db: db}
}
func (wrapper *MssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, execErr := wrapper.db.ExecContext(ctx, query, args...)
if execErr != nil {
return DbWrapperResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: affectedRows,
}, nil
}

View File

@@ -1,47 +0,0 @@
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
type PostgresDbWrapper struct {
db *pgxpool.Pool
}
func NewPostgresDbWrapper(db *pgxpool.Pool) DbWrapper {
return &PostgresDbWrapper{db: db}
}
func (wrapper *PostgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, err := wrapper.db.Exec(ctx, query, args...)
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: result.RowsAffected(),
}, nil
}

View File

@@ -1,11 +0,0 @@
package db
import "context"
type DbWrapperResult struct {
AffectedRows int64
}
type DbWrapper interface {
Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error)
}

View File

@@ -13,16 +13,17 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlExtractor struct { type MssqlExtractor struct {
db *sql.DB db dbwrapper.DbWrapper
} }
func NewMssqlExtractor(db *sql.DB) etl.Extractor { func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &MssqlExtractor{db: db} return &MssqlExtractor{db: db}
} }
@@ -118,7 +119,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
} }
rowsRead := 0 rowsRead := 0
rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...) rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil { if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }

View File

@@ -9,18 +9,18 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresExtractor struct { type PostgresExtractor struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor { func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &PostgresExtractor{db: pool} return &PostgresExtractor{db: db}
} }
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string { func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {

View File

@@ -1 +0,0 @@
package extractors

View File

@@ -9,19 +9,18 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresLoader struct { type PostgresLoader struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresLoader(pool *pgxpool.Pool) etl.Loader { func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: pool} return &PostgresLoader{db: db}
} }
func mapSlice[T any, V any](input []T, mapper func(T) V) []V { func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
@@ -40,12 +39,12 @@ func (postgresLd *PostgresLoader) ProcessBatch(
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} _, err := postgresLd.db.SaveMassive(
_, err := postgresLd.db.CopyFrom(
ctx, ctx,
tableId, tableInfo.Schema,
tableInfo.Table,
colNames, colNames,
pgx.CopyFromRows(batch.Rows), batch.Rows,
) )
if err != nil { if err != nil {
@@ -54,7 +53,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
if pgErr.Code == "23505" { if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{ return 0, &custom_errors.JobError{
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s", tableId.Sanitize()), Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Prev: err, Prev: err,
} }
} }

View File

@@ -8,16 +8,17 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlTableAnalyzer struct { type MssqlTableAnalyzer struct {
db *sql.DB db dbwrapper.DbWrapper
} }
func NewMssqlTableAnalyzer(db *sql.DB) etl.TableAnalyzer { func NewMssqlTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db} return &MssqlTableAnalyzer{db: db}
} }
@@ -35,7 +36,7 @@ JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND c.name NOT LIKE 'graph_id%' WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;` ORDER BY c.column_id;`
type rawColumnMssql struct { type rawColumnMssql struct {
@@ -142,7 +143,7 @@ func (ta *MssqlTableAnalyzer) QueryColumnTypes(
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second) localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
rows, err := ta.db.QueryContext(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)) rows, err := ta.db.Query(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -183,11 +184,11 @@ JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1) WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name` GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
var rowsCount int64 var rowsCount int64
err := ta.db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) err := ta.db.QueryRow(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -215,10 +216,10 @@ ORDER BY batch_id`,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table) tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
rows, err := ta.db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -6,16 +6,16 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresTableAnalyzer struct { type PostgresTableAnalyzer struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresTableAnalyzer(db *pgxpool.Pool) etl.TableAnalyzer { func NewPostgresTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &PostgresTableAnalyzer{db: db} return &PostgresTableAnalyzer{db: db}
} }
@@ -125,7 +125,7 @@ func (ta *PostgresTableAnalyzer) QueryColumnTypes(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,
) ([]models.ColumnType, error) { ) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second) localCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table) rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table)

View File

@@ -8,13 +8,32 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
func main() { func main() {
log.SetFormatter(&log.TextFormatter{ log.SetFormatter(&log.TextFormatter{
FullTimestamp: true, FullTimestamp: true,
@@ -27,8 +46,8 @@ func main() {
ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
sourcePool, err := db.Connect(ctxSource, config.App.SourceDbUrl) sourcePool, err := Connect(ctxSource, config.App.SourceDbUrl)
defer db.Close(sourcePool) defer Close(sourcePool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -37,8 +56,8 @@ func main() {
ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
targetPool, err := db.Connect(ctxTarget, config.App.TargetDbUrl) targetPool, err := Connect(ctxTarget, config.App.TargetDbUrl)
defer db.Close(targetPool) defer Close(targetPool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }