1 Commits

25 changed files with 256 additions and 613 deletions

1
.gitignore vendored
View File

@@ -4,7 +4,6 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
bin/
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test

View File

@@ -1,80 +0,0 @@
.PHONY: build build-linux build-windows build-all clean help
# Variables
BINARY_NAME=go-migrate
CMD_PATH=./cmd/go_migrate
OUTPUT_DIR=bin
VERSION?=$(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
GIT_COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Flags de compilación
LD_FLAGS=-ldflags="-s -w -X main.Version=$(VERSION) -X main.BuildTime=$(BUILD_TIME) -X main.GitCommit=$(GIT_COMMIT)"
# Default: compilar para el SO actual
build: build-$(OS)
ifeq ($(OS),Windows_NT)
build-native: build-windows
else
build-native: build-linux
endif
# Compilar para Linux (sin CGO para máxima compatibilidad)
build-linux:
@echo "Compilando para Linux..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64"
# Compilar para Windows
build-windows:
@echo "Compilando para Windows..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe"
# Compilar para ambas plataformas
build-all: build-linux build-windows
@echo ""
@echo "Binarios compilados:"
@ls -lh $(OUTPUT_DIR)/$(BINARY_NAME)*
# Compilar para Linux arm64 (opcional, para Raspberry Pi, etc.)
build-linux-arm64:
@echo "Compilando para Linux ARM64..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64"
# Limpiar binarios
clean:
@echo "Limpiando binarios..."
@rm -rf $(OUTPUT_DIR)
@echo "Limpieza completada"
# Ayuda
help:
@echo "Comandos disponibles:"
@echo ""
@echo " make build - Compilar para el SO actual (Linux/Windows)"
@echo " make build-linux - Compilar para Linux x86_64"
@echo " make build-windows - Compilar para Windows x86_64"
@echo " make build-linux-arm64 - Compilar para Linux ARM64 (opcional)"
@echo " make build-all - Compilar para Linux y Windows"
@echo " make clean - Eliminar binarios compilados"
@echo " make help - Mostrar esta ayuda"
@echo ""
@echo "Ejemplos de uso:"
@echo " make build-all # Crear binarios para ambas plataformas"
@echo " make build-linux OS= # Crear solo para Linux"
@echo ""

77
cmd/go_migrate/connect.go Normal file
View File

@@ -0,0 +1,77 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func connectToSqlServer() (*sql.DB, error) {
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
}
return db, nil
}
func connectToPostgres() (*pgxpool.Pool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
}
return pool, nil
}
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
var sourceDbErr, targetDbErr error
var sourceDb *sql.DB
var targetDb *pgxpool.Pool
var wg sync.WaitGroup
wg.Go(func() {
sourceDb, sourceDbErr = connectToSqlServer()
if sourceDbErr != nil {
log.Error("Unable to connect to source db: ", sourceDbErr)
}
})
wg.Go(func() {
targetDb, targetDbErr = connectToPostgres()
if targetDbErr != nil {
log.Error("Unable to connect to target db: ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Unable to connect to databases")
}
return sourceDb, targetDb, nil
}

View File

@@ -2,18 +2,18 @@ package main
import ( import (
"context" "context"
"database/sql"
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
) )
func main() { func main() {
@@ -33,33 +33,11 @@ func main() {
log.Info("=== Starting migration ===") log.Info("=== Starting migration ===")
var wgConnect errgroup.Group sourceDb, targetDb, connError := connectToDatabases()
var sourceDb, targetDb dbwrapper.DbWrapper if connError != nil {
log.Fatal("Connection error: ", connError)
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
if err := wgConnect.Wait(); err != nil {
log.Error("Connection error: ", err)
return
} }
defer sourceDb.Close() defer sourceDb.Close()
defer targetDb.Close() defer targetDb.Close()
@@ -92,14 +70,14 @@ func main() {
func processMigrationJobs( func processMigrationJobs(
ctx context.Context, ctx context.Context,
sourceDb dbwrapper.DbWrapper, sourceDb *sql.DB,
targetDb dbwrapper.DbWrapper, targetDb *pgxpool.Pool,
jobs []config.Job, jobs []config.Job,
maxParallelWorkers int, maxParallelWorkers int,
) []models.JobResult { ) []JobResult {
if len(jobs) == 0 { if len(jobs) == 0 {
log.Info("No migration jobs configured") log.Info("No migration jobs configured")
return []models.JobResult{} return []JobResult{}
} }
if maxParallelWorkers <= 0 { if maxParallelWorkers <= 0 {
@@ -112,10 +90,11 @@ func processMigrationJobs(
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan models.JobResult, len(jobs)) chJobResults := make(chan JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs)) chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup var wgJobs sync.WaitGroup
targetDbWrapper := db.NewPostgresDbWrapper(targetDb)
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
@@ -128,7 +107,7 @@ func processMigrationJobs(
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob( res := processMigrationJob(
ctx, ctx,
targetDb, targetDbWrapper,
sourceTableAnalyzer, sourceTableAnalyzer,
targetTableAnalyzer, targetTableAnalyzer,
extractor, extractor,
@@ -152,26 +131,10 @@ func processMigrationJobs(
close(chJobResults) close(chJobResults)
}() }()
var finalResults []models.JobResult var finalResults []JobResult
for res := range chJobResults { for res := range chJobResults {
finalResults = append(finalResults, res) finalResults = append(finalResults, res)
} }
return finalResults return finalResults
} }
func connectWithTimeout(ctx context.Context, dbType string, dbUrl string, timeout time.Duration) (dbwrapper.DbWrapper, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sourceDb, err := dbwrapper.New(dbType)
if err != nil {
return nil, err
}
if err = sourceDb.Connect(localCtx, dbUrl); err != nil {
return nil, err
}
return sourceDb, nil
}

13
cmd/go_migrate/metrics.go Normal file
View File

@@ -0,0 +1,13 @@
package main
import "time"
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -2,14 +2,13 @@ package main
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
@@ -19,18 +18,19 @@ import (
func processMigrationJob( func processMigrationJob(
ctx context.Context, ctx context.Context,
targetDbWrapper dbwrapper.DbWrapper, // sourceDbWrapper db.DbWrapper,
targetDbWrapper db.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer, sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor, extractor etl.Extractor,
transformer etl.Transformer, transformer etl.Transformer,
loader etl.Loader, loader etl.Loader,
job config.Job, job config.Job,
) models.JobResult { ) JobResult {
localCtx, cancel := context.WithCancel(ctx) localCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
result := models.JobResult{ result := JobResult{
JobName: job.Name, JobName: job.Name,
StartTime: time.Now(), StartTime: time.Now(),
} }
@@ -234,9 +234,5 @@ func processMigrationJob(
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded) result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed) result.RowsFailed = atomic.LoadInt64(&rowsFailed)
if result.RowsRead != result.RowsLoaded {
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
return result return result
} }

View File

@@ -30,12 +30,6 @@ jobs:
table: MANZANA table: MANZANA
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
# - 'TRUNCATE TABLE "Cartografia"."MANZANA"'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto - name: red_puerto
enabled: true enabled: true
@@ -48,6 +42,5 @@ jobs:
table: PUERTO table: PUERTO
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
# - 'TRUNCATE TABLE "Red"."PUERTO"'
post_sql: post_sql:
- "SELECT 1" - "SELECT 1"

View File

@@ -50,12 +50,6 @@ type Job struct {
PreSQL []string `yaml:"pre_sql"` PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"` PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"` JobConfig `yaml:",inline"`
Range struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
} }
type MigrationConfig struct { type MigrationConfig struct {
@@ -82,8 +76,6 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.MaxParallelWorkers = raw.MaxParallelWorkers c.MaxParallelWorkers = raw.MaxParallelWorkers
c.Defaults = raw.Defaults c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition) c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs { for _, node := range raw.Jobs {

View File

@@ -103,8 +103,8 @@ func ExtractorErrorHandler(
if err.HasLastId { if err.HasLastId {
newPartition.ParentId = err.Partition.Id newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New() newPartition.Id = uuid.New()
newPartition.Range.Min = err.LastId newPartition.LowerLimit = err.LastId
newPartition.Range.IsMinInclusive = false newPartition.IsLowerLimitInclusive = false
} }
requeueWithBackoff(ctx, delay, func() { requeueWithBackoff(ctx, delay, func() {

View File

@@ -1,19 +0,0 @@
package dbwrapper
import "fmt"
type Factory func() DbWrapper
var drivers = make(map[string]Factory)
func Register(name string, factory Factory) {
drivers[name] = factory
}
func New(driverType string) (DbWrapper, error) {
factory, ok := drivers[driverType]
if !ok {
return nil, fmt.Errorf("driver not yet supported: %s", driverType)
}
return factory(), nil
}

View File

@@ -1,176 +0,0 @@
package dbwrapper
import (
"context"
"database/sql"
"fmt"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
})
}
type mssqlRowResult struct {
row *sql.Row
}
func (mr *mssqlRowResult) Scan(dest ...any) error {
return mr.row.Scan(dest...)
}
type mssqlRowsResult struct {
columns []string
rows *sql.Rows
}
func (mr *mssqlRowsResult) Close() error {
return mr.rows.Close()
}
func (mr *mssqlRowsResult) Columns() ([]string, error) {
if mr.columns != nil {
return mr.columns, nil
}
return mr.rows.Columns()
}
func (mr *mssqlRowsResult) Err() error {
return mr.rows.Err()
}
func (mr *mssqlRowsResult) Next() bool {
return mr.rows.Next()
}
func (mr *mssqlRowsResult) Scan(dest ...any) error {
return mr.rows.Scan(dest...)
}
func (mr *mssqlRowsResult) Values() ([]any, error) {
columns, err := mr.Columns()
if err != nil {
return nil, err
}
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := mr.rows.Scan(scanArgs...); err != nil {
return nil, err
}
return rowValues, nil
}
type mssqlDbWrapper struct {
db *sql.DB
dialect string
}
func (mw *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error {
db, err := sql.Open("sqlserver", dbUrl)
if err != nil {
return err
}
if err := db.PingContext(ctx); err != nil {
if err := db.Close(); err != nil {
return err
}
return err
}
mw.db = db
return nil
}
func (mw *mssqlDbWrapper) Close() error {
return mw.db.Close()
}
func (mw *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, execErr := mw.db.ExecContext(ctx, query, args...)
if execErr != nil {
return ExecResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: affectedRows}, nil
}
func (mw *mssqlDbWrapper) GetDialect() string {
return mw.dialect
}
func (mw *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := mw.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &mssqlRowsResult{columns: nil, rows: rows}, nil
}
func (mw *mssqlDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := mw.db.QueryRowContext(ctx, query, args...)
return &mssqlRowResult{row: row}
}
func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
tx, err := mw.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
fullTableName := fmt.Sprintf("[%s].[%s]", schema, table)
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, columnNames...))
if err != nil {
tx.Rollback()
return 0, err
}
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
if err := stmt.Close(); err != nil {
tx.Rollback()
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
rowsAffected, raErr := result.RowsAffected()
if raErr != nil {
return 0, nil
}
return rowsAffected, nil
}

View File

@@ -1,128 +0,0 @@
package dbwrapper
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
})
}
type postgresRowResult struct {
row pgx.Row
}
func (pr *postgresRowResult) Scan(dest ...any) error {
return pr.row.Scan(dest...)
}
type postgresRowsResult struct {
columns []string
rows pgx.Rows
}
func (pr *postgresRowsResult) Close() error {
pr.rows.Close()
return nil
}
func (pr *postgresRowsResult) Columns() ([]string, error) {
if pr.columns != nil {
return pr.columns, nil
}
rawColumns := pr.rows.FieldDescriptions()
if rawColumns == nil {
return nil, errors.New("error retrieving columns")
}
columns := make([]string, 0, len(rawColumns))
for _, rc := range rawColumns {
columns = append(columns, rc.Name)
}
return columns, nil
}
func (pr *postgresRowsResult) Err() error {
return pr.rows.Err()
}
func (pr *postgresRowsResult) Next() bool {
return pr.rows.Next()
}
func (pr *postgresRowsResult) Scan(dest ...any) error {
return pr.rows.Scan(dest...)
}
func (pr *postgresRowsResult) Values() ([]any, error) {
return pr.rows.Values()
}
type postgresDbWrapper struct {
db *pgxpool.Pool
dialect string
}
func (pw *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error {
pool, err := pgxpool.New(ctx, dbUrl)
if err != nil {
return err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return err
}
pw.db = pool
return nil
}
func (pw *postgresDbWrapper) Close() error {
pw.db.Close()
return nil
}
func (pw *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, err := pw.db.Exec(ctx, query, args...)
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: result.RowsAffected()}, nil
}
func (pw *postgresDbWrapper) GetDialect() string {
return pw.dialect
}
func (pw *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := pw.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRowsResult{columns: nil, rows: rows}, nil
}
func (pw *postgresDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := pw.db.QueryRow(ctx, query, args...)
return &postgresRowResult{row: row}
}
func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
affectedRows, err := pw.db.CopyFrom(ctx, pgx.Identifier{schema, table}, columnNames, pgx.CopyFromRows(rows))
if err != nil {
return 0, err
}
return affectedRows, nil
}

View File

@@ -1,35 +0,0 @@
package dbwrapper
import (
"context"
"errors"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
type ExecResult struct {
AffectedRows int64
}
type RowsResult interface {
Close() error
Columns() ([]string, error)
Err() error
Next() bool
Scan(dest ...any) error
Values() ([]any, error)
}
type RowResult interface {
Scan(dest ...any) error
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
Exec(ctx context.Context, query string, args ...any) (ExecResult, error)
GetDialect() string
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
}

30
internal/app/db/mssql.go Normal file
View File

@@ -0,0 +1,30 @@
package db
import (
"context"
"database/sql"
)
type MssqlDbWrapper struct {
db *sql.DB
}
func NewMssqlDbWrapper(db *sql.DB) DbWrapper {
return &MssqlDbWrapper{db: db}
}
func (wrapper *MssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, execErr := wrapper.db.ExecContext(ctx, query, args...)
if execErr != nil {
return DbWrapperResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: affectedRows,
}, nil
}

View File

@@ -0,0 +1,47 @@
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
type PostgresDbWrapper struct {
db *pgxpool.Pool
}
func NewPostgresDbWrapper(db *pgxpool.Pool) DbWrapper {
return &PostgresDbWrapper{db: db}
}
func (wrapper *PostgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, err := wrapper.db.Exec(ctx, query, args...)
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: result.RowsAffected(),
}, nil
}

11
internal/app/db/types.go Normal file
View File

@@ -0,0 +1,11 @@
package db
import "context"
type DbWrapperResult struct {
AffectedRows int64
}
type DbWrapper interface {
Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error)
}

View File

@@ -13,17 +13,16 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlExtractor struct { type MssqlExtractor struct {
db dbwrapper.DbWrapper db *sql.DB
} }
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor { func NewMssqlExtractor(db *sql.DB) etl.Extractor {
return &MssqlExtractor{db: db} return &MssqlExtractor{db: db}
} }
@@ -108,18 +107,18 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int, error) { ) (int, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive) query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive)
var queryArgs []any var queryArgs []any
if partition.HasRange { if partition.ShouldUseRange {
queryArgs = append(queryArgs, queryArgs = append(queryArgs,
sql.Named("min", partition.Range.Min), sql.Named("min", partition.LowerLimit),
sql.Named("max", partition.Range.Max), sql.Named("max", partition.UpperLimit),
) )
} }
rowsRead := 0 rowsRead := 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...) rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...)
if err != nil { if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }

View File

@@ -9,18 +9,18 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresExtractor struct { type PostgresExtractor struct {
db dbwrapper.DbWrapper db *pgxpool.Pool
} }
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor { func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor {
return &PostgresExtractor{db: db} return &PostgresExtractor{db: pool}
} }
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string { func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
@@ -62,7 +62,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
) (int, error) { ) (int, error) {
query := buildExtractQueryPostgres(tableInfo, columns) query := buildExtractQueryPostgres(tableInfo, columns)
if partition.HasRange { if partition.ShouldUseRange {
return 0, errors.New("Batch config not yet supported") return 0, errors.New("Batch config not yet supported")
} }

View File

@@ -0,0 +1 @@
package extractors

View File

@@ -9,18 +9,19 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresLoader struct { type PostgresLoader struct {
db dbwrapper.DbWrapper db *pgxpool.Pool
} }
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader { func NewPostgresLoader(pool *pgxpool.Pool) etl.Loader {
return &PostgresLoader{db: db} return &PostgresLoader{db: pool}
} }
func mapSlice[T any, V any](input []T, mapper func(T) V) []V { func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
@@ -39,12 +40,12 @@ func (postgresLd *PostgresLoader) ProcessBatch(
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
_, err := postgresLd.db.SaveMassive( tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table}
_, err := postgresLd.db.CopyFrom(
ctx, ctx,
tableInfo.Schema, tableId,
tableInfo.Table,
colNames, colNames,
batch.Rows, pgx.CopyFromRows(batch.Rows),
) )
if err != nil { if err != nil {
@@ -53,7 +54,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
if pgErr.Code == "23505" { if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{ return 0, &custom_errors.JobError{
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table), Msg: fmt.Sprintf("Fatal error in table %s", tableId.Sanitize()),
Prev: err, Prev: err,
} }
} }

View File

@@ -23,9 +23,9 @@ func PartitionRangeGenerator(
if rowsCount <= rowsPerPartition { if rowsCount <= rowsPerPartition {
return []models.Partition{{ return []models.Partition{{
Id: uuid.New(), Id: uuid.New(),
HasRange: false, ShouldUseRange: false,
RetryCounter: 0, RetryCounter: 0,
}}, nil }}, nil
} }

View File

@@ -8,17 +8,16 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlTableAnalyzer struct { type MssqlTableAnalyzer struct {
db dbwrapper.DbWrapper db *sql.DB
} }
func NewMssqlTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer { func NewMssqlTableAnalyzer(db *sql.DB) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db} return &MssqlTableAnalyzer{db: db}
} }
@@ -36,7 +35,7 @@ JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%')) WHERE s.name = @schema AND st.name = @table AND c.name NOT LIKE 'graph_id%'
ORDER BY c.column_id;` ORDER BY c.column_id;`
type rawColumnMssql struct { type rawColumnMssql struct {
@@ -143,7 +142,7 @@ func (ta *MssqlTableAnalyzer) QueryColumnTypes(
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second) localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
rows, err := ta.db.Query(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)) rows, err := ta.db.QueryContext(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -184,11 +183,11 @@ JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1) WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name` GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel() defer cancel()
var rowsCount int64 var rowsCount int64
err := ta.db.QueryRow(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) err := ta.db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -216,10 +215,10 @@ ORDER BY batch_id`,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table) tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel() defer cancel()
rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) rows, err := ta.db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -229,15 +228,13 @@ ORDER BY batch_id`,
for rows.Next() { for rows.Next() {
partition := models.Partition{ partition := models.Partition{
Id: uuid.New(), Id: uuid.New(),
HasRange: true, ShouldUseRange: true,
RetryCounter: 0, RetryCounter: 0,
Range: models.PartitionRange{ IsLowerLimitInclusive: true,
IsMinInclusive: true,
},
} }
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil { if err := rows.Scan(&partition.LowerLimit, &partition.UpperLimit); err != nil {
return nil, err return nil, err
} }

View File

@@ -6,16 +6,16 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresTableAnalyzer struct { type PostgresTableAnalyzer struct {
db dbwrapper.DbWrapper db *pgxpool.Pool
} }
func NewPostgresTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer { func NewPostgresTableAnalyzer(db *pgxpool.Pool) etl.TableAnalyzer {
return &PostgresTableAnalyzer{db: db} return &PostgresTableAnalyzer{db: db}
} }
@@ -125,7 +125,7 @@ func (ta *PostgresTableAnalyzer) QueryColumnTypes(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,
) ([]models.ColumnType, error) { ) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table) rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table)

View File

@@ -1,10 +1,6 @@
package models package models
import ( import "github.com/google/uuid"
"time"
"github.com/google/uuid"
)
type UnknownRowValues = []any type UnknownRowValues = []any
@@ -15,27 +11,12 @@ type Batch struct {
RetryCounter int RetryCounter int
} }
type PartitionRange struct {
Min int64
Max int64
IsMinInclusive bool
IsMaxInclusive bool
}
type Partition struct { type Partition struct {
Id uuid.UUID Id uuid.UUID
ParentId uuid.UUID ParentId uuid.UUID
Range PartitionRange LowerLimit int64
HasRange bool UpperLimit int64
RetryCounter int IsLowerLimitInclusive bool
} ShouldUseRange bool
RetryCounter int
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
} }

View File

@@ -8,32 +8,13 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
func main() { func main() {
log.SetFormatter(&log.TextFormatter{ log.SetFormatter(&log.TextFormatter{
FullTimestamp: true, FullTimestamp: true,
@@ -46,8 +27,8 @@ func main() {
ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
sourcePool, err := Connect(ctxSource, config.App.SourceDbUrl) sourcePool, err := db.Connect(ctxSource, config.App.SourceDbUrl)
defer Close(sourcePool) defer db.Close(sourcePool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -56,8 +37,8 @@ func main() {
ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
targetPool, err := Connect(ctxTarget, config.App.TargetDbUrl) targetPool, err := db.Connect(ctxTarget, config.App.TargetDbUrl)
defer Close(targetPool) defer db.Close(targetPool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }