1 Commits

25 changed files with 256 additions and 613 deletions

1
.gitignore vendored
View File

@@ -4,7 +4,6 @@
*.dll
*.so
*.dylib
bin/
# Test binary, built with `go test -c`
*.test

View File

@@ -1,80 +0,0 @@
.PHONY: build build-linux build-windows build-all clean help
# Variables
BINARY_NAME=go-migrate
CMD_PATH=./cmd/go_migrate
OUTPUT_DIR=bin
VERSION?=$(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
GIT_COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Flags de compilación
LD_FLAGS=-ldflags="-s -w -X main.Version=$(VERSION) -X main.BuildTime=$(BUILD_TIME) -X main.GitCommit=$(GIT_COMMIT)"
# Default: compilar para el SO actual
build: build-$(OS)
ifeq ($(OS),Windows_NT)
build-native: build-windows
else
build-native: build-linux
endif
# Compilar para Linux (sin CGO para máxima compatibilidad)
build-linux:
@echo "Compilando para Linux..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64"
# Compilar para Windows
build-windows:
@echo "Compilando para Windows..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe"
# Compilar para ambas plataformas
build-all: build-linux build-windows
@echo ""
@echo "Binarios compilados:"
@ls -lh $(OUTPUT_DIR)/$(BINARY_NAME)*
# Compilar para Linux arm64 (opcional, para Raspberry Pi, etc.)
build-linux-arm64:
@echo "Compilando para Linux ARM64..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64"
# Limpiar binarios
clean:
@echo "Limpiando binarios..."
@rm -rf $(OUTPUT_DIR)
@echo "Limpieza completada"
# Ayuda
help:
@echo "Comandos disponibles:"
@echo ""
@echo " make build - Compilar para el SO actual (Linux/Windows)"
@echo " make build-linux - Compilar para Linux x86_64"
@echo " make build-windows - Compilar para Windows x86_64"
@echo " make build-linux-arm64 - Compilar para Linux ARM64 (opcional)"
@echo " make build-all - Compilar para Linux y Windows"
@echo " make clean - Eliminar binarios compilados"
@echo " make help - Mostrar esta ayuda"
@echo ""
@echo "Ejemplos de uso:"
@echo " make build-all # Crear binarios para ambas plataformas"
@echo " make build-linux OS= # Crear solo para Linux"
@echo ""

77
cmd/go_migrate/connect.go Normal file
View File

@@ -0,0 +1,77 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func connectToSqlServer() (*sql.DB, error) {
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
}
return db, nil
}
func connectToPostgres() (*pgxpool.Pool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
}
return pool, nil
}
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
var sourceDbErr, targetDbErr error
var sourceDb *sql.DB
var targetDb *pgxpool.Pool
var wg sync.WaitGroup
wg.Go(func() {
sourceDb, sourceDbErr = connectToSqlServer()
if sourceDbErr != nil {
log.Error("Unable to connect to source db: ", sourceDbErr)
}
})
wg.Go(func() {
targetDb, targetDbErr = connectToPostgres()
if targetDbErr != nil {
log.Error("Unable to connect to target db: ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Unable to connect to databases")
}
return sourceDb, targetDb, nil
}

View File

@@ -2,18 +2,18 @@ package main
import (
"context"
"database/sql"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
func main() {
@@ -33,33 +33,11 @@ func main() {
log.Info("=== Starting migration ===")
var wgConnect errgroup.Group
var sourceDb, targetDb dbwrapper.DbWrapper
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
if err := wgConnect.Wait(); err != nil {
log.Error("Connection error: ", err)
return
sourceDb, targetDb, connError := connectToDatabases()
if connError != nil {
log.Fatal("Connection error: ", connError)
}
defer sourceDb.Close()
defer targetDb.Close()
@@ -92,14 +70,14 @@ func main() {
func processMigrationJobs(
ctx context.Context,
sourceDb dbwrapper.DbWrapper,
targetDb dbwrapper.DbWrapper,
sourceDb *sql.DB,
targetDb *pgxpool.Pool,
jobs []config.Job,
maxParallelWorkers int,
) []models.JobResult {
) []JobResult {
if len(jobs) == 0 {
log.Info("No migration jobs configured")
return []models.JobResult{}
return []JobResult{}
}
if maxParallelWorkers <= 0 {
@@ -112,10 +90,11 @@ func processMigrationJobs(
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan models.JobResult, len(jobs))
chJobResults := make(chan JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup
targetDbWrapper := db.NewPostgresDbWrapper(targetDb)
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
@@ -128,7 +107,7 @@ func processMigrationJobs(
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob(
ctx,
targetDb,
targetDbWrapper,
sourceTableAnalyzer,
targetTableAnalyzer,
extractor,
@@ -152,26 +131,10 @@ func processMigrationJobs(
close(chJobResults)
}()
var finalResults []models.JobResult
var finalResults []JobResult
for res := range chJobResults {
finalResults = append(finalResults, res)
}
return finalResults
}
func connectWithTimeout(ctx context.Context, dbType string, dbUrl string, timeout time.Duration) (dbwrapper.DbWrapper, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sourceDb, err := dbwrapper.New(dbType)
if err != nil {
return nil, err
}
if err = sourceDb.Connect(localCtx, dbUrl); err != nil {
return nil, err
}
return sourceDb, nil
}

13
cmd/go_migrate/metrics.go Normal file
View File

@@ -0,0 +1,13 @@
package main
import "time"
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -2,14 +2,13 @@ package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
@@ -19,18 +18,19 @@ import (
func processMigrationJob(
ctx context.Context,
targetDbWrapper dbwrapper.DbWrapper,
// sourceDbWrapper db.DbWrapper,
targetDbWrapper db.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor,
transformer etl.Transformer,
loader etl.Loader,
job config.Job,
) models.JobResult {
) JobResult {
localCtx, cancel := context.WithCancel(ctx)
defer cancel()
result := models.JobResult{
result := JobResult{
JobName: job.Name,
StartTime: time.Now(),
}
@@ -234,9 +234,5 @@ func processMigrationJob(
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed)
if result.RowsRead != result.RowsLoaded {
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
return result
}

View File

@@ -30,12 +30,6 @@ jobs:
table: MANZANA
pre_sql:
- 'SELECT 1'
# - 'TRUNCATE TABLE "Cartografia"."MANZANA"'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto
enabled: true
@@ -48,6 +42,5 @@ jobs:
table: PUERTO
pre_sql:
- 'SELECT 1'
# - 'TRUNCATE TABLE "Red"."PUERTO"'
post_sql:
- "SELECT 1"

View File

@@ -50,12 +50,6 @@ type Job struct {
PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"`
Range struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
}
type MigrationConfig struct {
@@ -82,8 +76,6 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.MaxParallelWorkers = raw.MaxParallelWorkers
c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs {

View File

@@ -103,8 +103,8 @@ func ExtractorErrorHandler(
if err.HasLastId {
newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New()
newPartition.Range.Min = err.LastId
newPartition.Range.IsMinInclusive = false
newPartition.LowerLimit = err.LastId
newPartition.IsLowerLimitInclusive = false
}
requeueWithBackoff(ctx, delay, func() {

View File

@@ -1,19 +0,0 @@
package dbwrapper
import "fmt"
type Factory func() DbWrapper
var drivers = make(map[string]Factory)
func Register(name string, factory Factory) {
drivers[name] = factory
}
func New(driverType string) (DbWrapper, error) {
factory, ok := drivers[driverType]
if !ok {
return nil, fmt.Errorf("driver not yet supported: %s", driverType)
}
return factory(), nil
}

View File

@@ -1,176 +0,0 @@
package dbwrapper
import (
"context"
"database/sql"
"fmt"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
})
}
type mssqlRowResult struct {
row *sql.Row
}
func (mr *mssqlRowResult) Scan(dest ...any) error {
return mr.row.Scan(dest...)
}
type mssqlRowsResult struct {
columns []string
rows *sql.Rows
}
func (mr *mssqlRowsResult) Close() error {
return mr.rows.Close()
}
func (mr *mssqlRowsResult) Columns() ([]string, error) {
if mr.columns != nil {
return mr.columns, nil
}
return mr.rows.Columns()
}
func (mr *mssqlRowsResult) Err() error {
return mr.rows.Err()
}
func (mr *mssqlRowsResult) Next() bool {
return mr.rows.Next()
}
func (mr *mssqlRowsResult) Scan(dest ...any) error {
return mr.rows.Scan(dest...)
}
func (mr *mssqlRowsResult) Values() ([]any, error) {
columns, err := mr.Columns()
if err != nil {
return nil, err
}
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := mr.rows.Scan(scanArgs...); err != nil {
return nil, err
}
return rowValues, nil
}
type mssqlDbWrapper struct {
db *sql.DB
dialect string
}
func (mw *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error {
db, err := sql.Open("sqlserver", dbUrl)
if err != nil {
return err
}
if err := db.PingContext(ctx); err != nil {
if err := db.Close(); err != nil {
return err
}
return err
}
mw.db = db
return nil
}
func (mw *mssqlDbWrapper) Close() error {
return mw.db.Close()
}
func (mw *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, execErr := mw.db.ExecContext(ctx, query, args...)
if execErr != nil {
return ExecResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: affectedRows}, nil
}
func (mw *mssqlDbWrapper) GetDialect() string {
return mw.dialect
}
func (mw *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := mw.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &mssqlRowsResult{columns: nil, rows: rows}, nil
}
func (mw *mssqlDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := mw.db.QueryRowContext(ctx, query, args...)
return &mssqlRowResult{row: row}
}
func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
tx, err := mw.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
fullTableName := fmt.Sprintf("[%s].[%s]", schema, table)
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, columnNames...))
if err != nil {
tx.Rollback()
return 0, err
}
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
if err := stmt.Close(); err != nil {
tx.Rollback()
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
rowsAffected, raErr := result.RowsAffected()
if raErr != nil {
return 0, nil
}
return rowsAffected, nil
}

View File

@@ -1,128 +0,0 @@
package dbwrapper
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
})
}
type postgresRowResult struct {
row pgx.Row
}
func (pr *postgresRowResult) Scan(dest ...any) error {
return pr.row.Scan(dest...)
}
type postgresRowsResult struct {
columns []string
rows pgx.Rows
}
func (pr *postgresRowsResult) Close() error {
pr.rows.Close()
return nil
}
func (pr *postgresRowsResult) Columns() ([]string, error) {
if pr.columns != nil {
return pr.columns, nil
}
rawColumns := pr.rows.FieldDescriptions()
if rawColumns == nil {
return nil, errors.New("error retrieving columns")
}
columns := make([]string, 0, len(rawColumns))
for _, rc := range rawColumns {
columns = append(columns, rc.Name)
}
return columns, nil
}
func (pr *postgresRowsResult) Err() error {
return pr.rows.Err()
}
func (pr *postgresRowsResult) Next() bool {
return pr.rows.Next()
}
func (pr *postgresRowsResult) Scan(dest ...any) error {
return pr.rows.Scan(dest...)
}
func (pr *postgresRowsResult) Values() ([]any, error) {
return pr.rows.Values()
}
type postgresDbWrapper struct {
db *pgxpool.Pool
dialect string
}
func (pw *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error {
pool, err := pgxpool.New(ctx, dbUrl)
if err != nil {
return err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return err
}
pw.db = pool
return nil
}
func (pw *postgresDbWrapper) Close() error {
pw.db.Close()
return nil
}
func (pw *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, err := pw.db.Exec(ctx, query, args...)
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: result.RowsAffected()}, nil
}
func (pw *postgresDbWrapper) GetDialect() string {
return pw.dialect
}
func (pw *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := pw.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRowsResult{columns: nil, rows: rows}, nil
}
func (pw *postgresDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := pw.db.QueryRow(ctx, query, args...)
return &postgresRowResult{row: row}
}
func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
affectedRows, err := pw.db.CopyFrom(ctx, pgx.Identifier{schema, table}, columnNames, pgx.CopyFromRows(rows))
if err != nil {
return 0, err
}
return affectedRows, nil
}

View File

@@ -1,35 +0,0 @@
package dbwrapper
import (
"context"
"errors"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
type ExecResult struct {
AffectedRows int64
}
type RowsResult interface {
Close() error
Columns() ([]string, error)
Err() error
Next() bool
Scan(dest ...any) error
Values() ([]any, error)
}
type RowResult interface {
Scan(dest ...any) error
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
Exec(ctx context.Context, query string, args ...any) (ExecResult, error)
GetDialect() string
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
}

30
internal/app/db/mssql.go Normal file
View File

@@ -0,0 +1,30 @@
package db
import (
"context"
"database/sql"
)
type MssqlDbWrapper struct {
db *sql.DB
}
func NewMssqlDbWrapper(db *sql.DB) DbWrapper {
return &MssqlDbWrapper{db: db}
}
func (wrapper *MssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, execErr := wrapper.db.ExecContext(ctx, query, args...)
if execErr != nil {
return DbWrapperResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: affectedRows,
}, nil
}

View File

@@ -0,0 +1,47 @@
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
type PostgresDbWrapper struct {
db *pgxpool.Pool
}
func NewPostgresDbWrapper(db *pgxpool.Pool) DbWrapper {
return &PostgresDbWrapper{db: db}
}
func (wrapper *PostgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, err := wrapper.db.Exec(ctx, query, args...)
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: result.RowsAffected(),
}, nil
}

11
internal/app/db/types.go Normal file
View File

@@ -0,0 +1,11 @@
package db
import "context"
type DbWrapperResult struct {
AffectedRows int64
}
type DbWrapper interface {
Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error)
}

View File

@@ -13,17 +13,16 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type MssqlExtractor struct {
db dbwrapper.DbWrapper
db *sql.DB
}
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
func NewMssqlExtractor(db *sql.DB) etl.Extractor {
return &MssqlExtractor{db: db}
}
@@ -108,18 +107,18 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive)
var queryArgs []any
if partition.HasRange {
if partition.ShouldUseRange {
queryArgs = append(queryArgs,
sql.Named("min", partition.Range.Min),
sql.Named("max", partition.Range.Max),
sql.Named("min", partition.LowerLimit),
sql.Named("max", partition.UpperLimit),
)
}
rowsRead := 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}

View File

@@ -9,18 +9,18 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
)
type PostgresExtractor struct {
db dbwrapper.DbWrapper
db *pgxpool.Pool
}
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &PostgresExtractor{db: db}
func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor {
return &PostgresExtractor{db: pool}
}
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
@@ -62,7 +62,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
) (int, error) {
query := buildExtractQueryPostgres(tableInfo, columns)
if partition.HasRange {
if partition.ShouldUseRange {
return 0, errors.New("Batch config not yet supported")
}

View File

@@ -0,0 +1 @@
package extractors

View File

@@ -9,18 +9,19 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)
type PostgresLoader struct {
db dbwrapper.DbWrapper
db *pgxpool.Pool
}
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: db}
func NewPostgresLoader(pool *pgxpool.Pool) etl.Loader {
return &PostgresLoader{db: pool}
}
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
@@ -39,12 +40,12 @@ func (postgresLd *PostgresLoader) ProcessBatch(
colNames []string,
batch models.Batch,
) (int, error) {
_, err := postgresLd.db.SaveMassive(
tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table}
_, err := postgresLd.db.CopyFrom(
ctx,
tableInfo.Schema,
tableInfo.Table,
tableId,
colNames,
batch.Rows,
pgx.CopyFromRows(batch.Rows),
)
if err != nil {
@@ -53,7 +54,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Msg: fmt.Sprintf("Fatal error in table %s", tableId.Sanitize()),
Prev: err,
}
}

View File

@@ -23,9 +23,9 @@ func PartitionRangeGenerator(
if rowsCount <= rowsPerPartition {
return []models.Partition{{
Id: uuid.New(),
HasRange: false,
RetryCounter: 0,
Id: uuid.New(),
ShouldUseRange: false,
RetryCounter: 0,
}}, nil
}

View File

@@ -8,17 +8,16 @@ import (
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type MssqlTableAnalyzer struct {
db dbwrapper.DbWrapper
db *sql.DB
}
func NewMssqlTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
func NewMssqlTableAnalyzer(db *sql.DB) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db}
}
@@ -36,7 +35,7 @@ JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
WHERE s.name = @schema AND st.name = @table AND c.name NOT LIKE 'graph_id%'
ORDER BY c.column_id;`
type rawColumnMssql struct {
@@ -143,7 +142,7 @@ func (ta *MssqlTableAnalyzer) QueryColumnTypes(
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
rows, err := ta.db.Query(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
rows, err := ta.db.QueryContext(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil {
return nil, err
}
@@ -184,11 +183,11 @@ JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
var rowsCount int64
err := ta.db.QueryRow(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
err := ta.db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil {
return 0, err
}
@@ -216,10 +215,10 @@ ORDER BY batch_id`,
tableInfo.Schema,
tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
rows, err := ta.db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil {
return nil, err
}
@@ -229,15 +228,13 @@ ORDER BY batch_id`,
for rows.Next() {
partition := models.Partition{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
IsMinInclusive: true,
},
Id: uuid.New(),
ShouldUseRange: true,
RetryCounter: 0,
IsLowerLimitInclusive: true,
}
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
if err := rows.Scan(&partition.LowerLimit, &partition.UpperLimit); err != nil {
return nil, err
}

View File

@@ -6,16 +6,16 @@ import (
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
)
type PostgresTableAnalyzer struct {
db dbwrapper.DbWrapper
db *pgxpool.Pool
}
func NewPostgresTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
func NewPostgresTableAnalyzer(db *pgxpool.Pool) etl.TableAnalyzer {
return &PostgresTableAnalyzer{db: db}
}
@@ -125,7 +125,7 @@ func (ta *PostgresTableAnalyzer) QueryColumnTypes(
ctx context.Context,
tableInfo config.TableInfo,
) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table)

View File

@@ -1,10 +1,6 @@
package models
import (
"time"
"github.com/google/uuid"
)
import "github.com/google/uuid"
type UnknownRowValues = []any
@@ -15,27 +11,12 @@ type Batch struct {
RetryCounter int
}
type PartitionRange struct {
Min int64
Max int64
IsMinInclusive bool
IsMaxInclusive bool
}
type Partition struct {
Id uuid.UUID
ParentId uuid.UUID
Range PartitionRange
HasRange bool
RetryCounter int
}
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
Id uuid.UUID
ParentId uuid.UUID
LowerLimit int64
UpperLimit int64
IsLowerLimitInclusive bool
ShouldUseRange bool
RetryCounter int
}

View File

@@ -8,32 +8,13 @@ import (
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
func main() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
@@ -46,8 +27,8 @@ func main() {
ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
sourcePool, err := Connect(ctxSource, config.App.SourceDbUrl)
defer Close(sourcePool)
sourcePool, err := db.Connect(ctxSource, config.App.SourceDbUrl)
defer db.Close(sourcePool)
if err != nil {
log.Fatal(err)
}
@@ -56,8 +37,8 @@ func main() {
ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
targetPool, err := Connect(ctxTarget, config.App.TargetDbUrl)
defer Close(targetPool)
targetPool, err := db.Connect(ctxTarget, config.App.TargetDbUrl)
defer db.Close(targetPool)
if err != nil {
log.Fatal(err)
}