32 Commits

Author SHA1 Message Date
6ad25e5889 feat: update job configuration for infraestructura_site_holder__attach; enhance storage handling and retry logic 2026-04-22 15:23:19 -05:00
0ac5f01b65 feat: update configuration handling to use cleanenv; remove unused dependencies and improve error logging 2026-04-22 12:31:31 -05:00
3b1371a270 feat: integrate Azure storage handling in migration process; update transformers and job processing logic 2026-04-21 14:15:20 -05:00
bb7b35619a feat: add ToStorage configuration to JobConfig for enhanced data handling 2026-04-21 13:43:03 -05:00
cd2efb8692 feat: add new job configuration for red_terminal__attach with source and target definitions 2026-04-21 13:38:11 -05:00
9964ef819b feat: refactor migration job to use preSQL and postSQL from TargetTable; update config structure for job definitions 2026-04-21 12:30:40 -05:00
bd51223855 feat: add truncate query handling in migration process; update job configuration to remove commented truncate SQL 2026-04-21 11:40:02 -05:00
aa71eeb5c1 feat: enhance range handling in MSSQL and Postgres extractors; update partition range generator logic 2026-04-21 11:32:52 -05:00
9eb8800864 feat: add range configuration to job and update extractors for inclusive range handling 2026-04-21 11:29:34 -05:00
09bd364976 feat: implement Azure Blob Storage client and refactor configuration structure 2026-04-21 11:03:56 -05:00
f2e6edd8fa feat: add caarlos0/env package for environment variable management and refactor appConfig structure 2026-04-21 10:41:38 -05:00
c5a18f6d95 chore: update .env.example to set LOG_LEVEL to INFO and add Azure storage configuration variables 2026-04-21 10:15:01 -05:00
8217b13d08 feat: add azblob client implementation for file uploads 2026-04-21 10:01:42 -05:00
16c232762e feat: refactor extractor interface and implement Consume function for ETL process 2026-04-20 00:19:41 -05:00
b4a846575d Update .env.example with new variables 2026-04-19 23:08:09 -05:00
5bd730f026 feat: update .gitignore to include .vscode directory 2026-04-19 23:06:18 -05:00
7bd80d4180 feat: enhance logging configuration to use dynamic log level from environment variable 2026-04-19 23:06:13 -05:00
846a49d40c feat: implement GenericLoader for batch processing and utility functions 2026-04-17 15:58:08 -05:00
93b302db8e feat: refactor job result handling and remove unused files 2026-04-16 16:47:35 -05:00
39c0d99502 feat: extend context timeout to 1 minute for database queries in mssql.go and postgres.go 2026-04-16 13:19:46 -05:00
b418ded78b feat: update SQL query to filter out specific column names in mssql.go 2026-04-16 13:14:39 -05:00
0d0511716f feat: add row count mismatch error handling in processMigrationJob and update SQL query to exclude additional graph-related columns 2026-04-16 12:46:55 -05:00
67fb0148ae feat: add Makefile for building binaries across platforms 2026-04-16 11:51:05 -05:00
098cf36e3c feat: comment out TRUNCATE statements in pre_sql for MANZANA and PUERTO jobs 2026-04-16 11:46:24 -05:00
5484716b81 feat: add source and target database type fields to MigrationConfig 2026-04-16 09:08:21 -05:00
df4c3bc390 feat: refactor db handling to use db-wrapper package; enhance connection management and result handling for MSSQL and Postgres 2026-04-16 08:48:29 -05:00
ea41a7c218 feat: register MSSQL and Postgres drivers in db-wrapper for improved factory pattern support 2026-04-15 23:09:56 -05:00
f09284ecdc feat: enhance db-wrapper with improved MSSQL and Postgres implementations; add row result handling and dialect support 2026-04-15 22:55:14 -05:00
0384d5423f feat: add range configuration to job settings for enhanced data processing control 2026-04-15 20:23:45 -05:00
1ce3d9e153 refactor: update partition handling to use Range struct for better clarity and consistency 2026-04-15 20:23:45 -05:00
DiegoAlessandroMotta
ed889b740a add db-wrapper package types 2026-04-15 20:22:23 -05:00
803f8988b8 refactor: update extractor interfaces to return row counts instead of using pointers for rows read 2026-04-13 19:25:18 -05:00
38 changed files with 1252 additions and 498 deletions

24
.atl/skill-registry.md Normal file
View File

@@ -0,0 +1,24 @@
# Skill Registry — go-migrate
Generated: 2026-04-21
## Compact Rules
### Go conventions
- Use existing error wrapping pattern: `fmt.Errorf("context: %w", err)`
- Channel-based pipeline — keep goroutine lifecycle clean (close channels in correct order)
- No comments unless non-obvious WHY; no docstrings
- Prefer named returns only when it aids clarity in short functions
- Use `strings.EqualFold` for case-insensitive column name comparison
### Project conventions
- Config structs live in `internal/app/config/`
- ETL interfaces live in `internal/app/etl/types.go`
- Transformer implementations in `internal/app/etl/transformers/`
- Azure operations via `internal/app/azure/main.go`
- Per-job transformer creation (not shared) when job has storage config
## User Skills
| Trigger | Skill |
|---------|-------|
| sdd-* | SDD workflow skills |

View File

@@ -1,2 +1,12 @@
PG_FROM_DB_URL=postgresql://postgres:password@localhost:5432/db SOURCE_DB_URL=sqlserver://sa:password@localhost:1433?database=master&packet+size=32767&loc=UTC
PG_TO_DB_URL=postgresql://postgres:password@localhost:5432/db TARGET_DB_URL=postgresql://postgres:password@localhost:5432/db
LOG_LEVEL=INFO
AZ_STORAGE_ENABLED=false
AZ_ACCOUNT_NAME=
AZ_CONTAINER=
AZ_ACCOUNT_KEY=
AZ_USE_HTTPS=true
AZ_SERVICE_URL=
AZ_PREFIX=

3
.gitignore vendored
View File

@@ -4,6 +4,7 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
bin/
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test
@@ -26,5 +27,5 @@ go.work.sum
# Editor/IDE # Editor/IDE
# .idea/ # .idea/
# .vscode/ .vscode/
.temp .temp

80
Makefile Normal file
View File

@@ -0,0 +1,80 @@
.PHONY: build build-linux build-windows build-all clean help
# Variables
BINARY_NAME=go-migrate
CMD_PATH=./cmd/go_migrate
OUTPUT_DIR=bin
VERSION?=$(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
GIT_COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Flags de compilación
LD_FLAGS=-ldflags="-s -w -X main.Version=$(VERSION) -X main.BuildTime=$(BUILD_TIME) -X main.GitCommit=$(GIT_COMMIT)"
# Default: compilar para el SO actual
build: build-$(OS)
ifeq ($(OS),Windows_NT)
build-native: build-windows
else
build-native: build-linux
endif
# Compilar para Linux (sin CGO para máxima compatibilidad)
build-linux:
@echo "Compilando para Linux..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64"
# Compilar para Windows
build-windows:
@echo "Compilando para Windows..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe"
# Compilar para ambas plataformas
build-all: build-linux build-windows
@echo ""
@echo "Binarios compilados:"
@ls -lh $(OUTPUT_DIR)/$(BINARY_NAME)*
# Compilar para Linux arm64 (opcional, para Raspberry Pi, etc.)
build-linux-arm64:
@echo "Compilando para Linux ARM64..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64"
# Limpiar binarios
clean:
@echo "Limpiando binarios..."
@rm -rf $(OUTPUT_DIR)
@echo "Limpieza completada"
# Ayuda
help:
@echo "Comandos disponibles:"
@echo ""
@echo " make build - Compilar para el SO actual (Linux/Windows)"
@echo " make build-linux - Compilar para Linux x86_64"
@echo " make build-windows - Compilar para Windows x86_64"
@echo " make build-linux-arm64 - Compilar para Linux ARM64 (opcional)"
@echo " make build-all - Compilar para Linux y Windows"
@echo " make clean - Eliminar binarios compilados"
@echo " make help - Mostrar esta ayuda"
@echo ""
@echo "Ejemplos de uso:"
@echo " make build-all # Crear binarios para ambas plataformas"
@echo " make build-linux OS= # Crear solo para Linux"
@echo ""

View File

@@ -1,77 +0,0 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func connectToSqlServer() (*sql.DB, error) {
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
}
return db, nil
}
func connectToPostgres() (*pgxpool.Pool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
}
return pool, nil
}
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
var sourceDbErr, targetDbErr error
var sourceDb *sql.DB
var targetDb *pgxpool.Pool
var wg sync.WaitGroup
wg.Go(func() {
sourceDb, sourceDbErr = connectToSqlServer()
if sourceDbErr != nil {
log.Error("Unable to connect to source db: ", sourceDbErr)
}
})
wg.Go(func() {
targetDb, targetDbErr = connectToPostgres()
if targetDbErr != nil {
log.Error("Unable to connect to target db: ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Unable to connect to databases")
}
return sourceDb, targetDb, nil
}

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@@ -13,5 +14,13 @@ func configureLog() {
DisableSorting: false, DisableSorting: false,
PadLevelText: true, PadLevelText: true,
}) })
log.SetLevel(log.DebugLevel)
logLevelEnv := config.App.LogLevel
logLevel, err := log.ParseLevel(logLevelEnv)
if err != nil {
log.Warnf("Nivel de log inválido '%s', usando INFO por defecto", logLevelEnv)
logLevel = log.InfoLevel
}
log.SetLevel(logLevel)
} }

View File

@@ -2,18 +2,18 @@ package main
import ( import (
"context" "context"
"database/sql"
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
) )
func main() { func main() {
@@ -24,7 +24,7 @@ func main() {
log.Fatalf("error leyendo configuracion: %v", err) log.Fatalf("error leyendo configuracion: %v", err)
} }
log.Debugf("Config: %+v", migrationConfig) // log.Debugf("Config: %+v", migrationConfig)
startTime := time.Now() startTime := time.Now()
@@ -33,11 +33,33 @@ func main() {
log.Info("=== Starting migration ===") log.Info("=== Starting migration ===")
sourceDb, targetDb, connError := connectToDatabases() var wgConnect errgroup.Group
if connError != nil { var sourceDb, targetDb dbwrapper.DbWrapper
log.Fatal("Connection error: ", connError)
}
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
if err := wgConnect.Wait(); err != nil {
log.Error("Connection error: ", err)
return
}
defer sourceDb.Close() defer sourceDb.Close()
defer targetDb.Close() defer targetDb.Close()
@@ -70,14 +92,14 @@ func main() {
func processMigrationJobs( func processMigrationJobs(
ctx context.Context, ctx context.Context,
sourceDb *sql.DB, sourceDb dbwrapper.DbWrapper,
targetDb *pgxpool.Pool, targetDb dbwrapper.DbWrapper,
jobs []config.Job, jobs []config.Job,
maxParallelWorkers int, maxParallelWorkers int,
) []JobResult { ) []models.JobResult {
if len(jobs) == 0 { if len(jobs) == 0 {
log.Info("No migration jobs configured") log.Info("No migration jobs configured")
return []JobResult{} return []models.JobResult{}
} }
if maxParallelWorkers <= 0 { if maxParallelWorkers <= 0 {
@@ -90,16 +112,23 @@ func processMigrationJobs(
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan JobResult, len(jobs)) chJobResults := make(chan models.JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs)) chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup var wgJobs sync.WaitGroup
targetDbWrapper := db.NewPostgresDbWrapper(targetDb)
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer() loader := loaders.NewGenericLoader(targetDb)
loader := loaders.NewPostgresLoader(targetDb)
var azureClient *azure.Client
if config.App.AzureStorage.Enabled {
var err error
azureClient, err = azure.NewClient(config.App.AzureStorage)
if err != nil {
log.Fatalf("Failed to create Azure storage client: %v", err)
}
}
for i := range maxParallelWorkers { for i := range maxParallelWorkers {
wgJobs.Go(func() { wgJobs.Go(func() {
@@ -107,13 +136,14 @@ func processMigrationJobs(
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob( res := processMigrationJob(
ctx, ctx,
targetDbWrapper, targetDb,
sourceTableAnalyzer, sourceTableAnalyzer,
targetTableAnalyzer, targetTableAnalyzer,
extractor, extractor,
transformer, azureClient,
loader, loader,
job, job,
targetDb.GetDialect(),
) )
chJobResults <- res chJobResults <- res
@@ -131,10 +161,26 @@ func processMigrationJobs(
close(chJobResults) close(chJobResults)
}() }()
var finalResults []JobResult var finalResults []models.JobResult
for res := range chJobResults { for res := range chJobResults {
finalResults = append(finalResults, res) finalResults = append(finalResults, res)
} }
return finalResults return finalResults
} }
func connectWithTimeout(ctx context.Context, dbType string, dbUrl string, timeout time.Duration) (dbwrapper.DbWrapper, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sourceDb, err := dbwrapper.New(dbType)
if err != nil {
return nil, err
}
if err = sourceDb.Connect(localCtx, dbUrl); err != nil {
return nil, err
}
return sourceDb, nil
}

View File

@@ -1,13 +0,0 @@
package main
import "time"
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -2,35 +2,54 @@ package main
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
func buildTruncateQuery(targetDbType, schema, table, truncateMethod string) string {
if truncateMethod == "DELETE" {
if targetDbType == "postgres" {
return fmt.Sprintf(`DELETE FROM "%s"."%s"`, schema, table)
}
return fmt.Sprintf(`DELETE FROM [%s].[%s]`, schema, table)
}
if targetDbType == "postgres" {
return fmt.Sprintf(`TRUNCATE TABLE "%s"."%s"`, schema, table)
}
return fmt.Sprintf(`TRUNCATE TABLE [%s].[%s]`, schema, table)
}
func processMigrationJob( func processMigrationJob(
ctx context.Context, ctx context.Context,
// sourceDbWrapper db.DbWrapper, targetDbWrapper dbwrapper.DbWrapper,
targetDbWrapper db.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer, sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor, extractor etl.Extractor,
transformer etl.Transformer, azureClient *azure.Client,
loader etl.Loader, loader etl.Loader,
job config.Job, job config.Job,
) JobResult { targetDbType string,
) models.JobResult {
transformer := transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
localCtx, cancel := context.WithCancel(ctx) localCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
result := JobResult{ result := models.JobResult{
JobName: job.Name, JobName: job.Name,
StartTime: time.Now(), StartTime: time.Now(),
} }
@@ -66,7 +85,13 @@ func processMigrationJob(
return result return result
} }
for _, query := range job.PreSQL { preSqlQueries := job.TargetTable.PreSQL
if job.TruncateTarget {
truncateQuery := buildTruncateQuery(targetDbType, job.TargetTable.Schema, job.TargetTable.Table, job.TruncateMethod)
preSqlQueries = append([]string{truncateQuery}, job.TargetTable.PreSQL...)
}
for _, query := range preSqlQueries {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil { if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err result.Error = err
return result return result
@@ -79,6 +104,7 @@ func processMigrationJob(
job.SourceTable.TableInfo, job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey, job.SourceTable.PrimaryKey,
job.RowsPerPartition, job.RowsPerPartition,
job.Range,
) )
if err != nil { if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
@@ -129,8 +155,9 @@ func processMigrationJob(
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
extractor.Exec( extractors.Consume(
localCtx, localCtx,
extractor,
job.SourceTable, job.SourceTable,
sourceColTypes, sourceColTypes,
job.BatchSize, job.BatchSize,
@@ -214,7 +241,7 @@ func processMigrationJob(
cancel() cancel()
}() }()
for _, query := range job.PostSQL { for _, query := range job.TargetTable.PostSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil { if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err result.Error = err
return result return result
@@ -234,5 +261,9 @@ func processMigrationJob(
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded) result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed) result.RowsFailed = atomic.LoadInt64(&rowsFailed)
if result.RowsRead != result.RowsLoaded {
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
return result return result
} }

View File

@@ -28,8 +28,13 @@ jobs:
target: target:
schema: Cartografia schema: Cartografia
table: MANZANA table: MANZANA
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto - name: red_puerto
enabled: true enabled: true
@@ -40,7 +45,30 @@ jobs:
target: target:
schema: Red schema: Red
table: PUERTO table: PUERTO
pre_sql: pre_sql:
- 'SELECT 1' - 'SELECT 1'
post_sql: post_sql:
- "SELECT 1" - "SELECT 1"
- name: infraestructura_site_holder__attach
source:
schema: Infraestructura
table: SITE_HOLDER__ATTACH
primary_key: GDB_ARCHIVE_OID
target:
schema: Infraestructura
table: SITE_HOLDER__ATTACH
to_storage:
columns:
- source: DATA
target: FILE_URL
mode: REFERENCE_ONLY
max_extractors: 8
max_loaders: 4
queue_size: 32
batch_size: 1
retry:
attempts: 5
base_delay_ms: 1000
max_delay_ms: 15000
max_jitter_ms: 500

12
go.mod
View File

@@ -1,12 +1,13 @@
module git.ksdemosapps.com/kylesoda/go-migrate module git.ksdemosapps.com/kylesoda/go-migrate
go 1.25.7 go 1.26
require ( require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4 github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/ilyakaznacheev/cleanenv v1.5.0
github.com/jackc/pgx/v5 v5.9.1 github.com/jackc/pgx/v5 v5.9.1
github.com/joho/godotenv v1.5.1
github.com/microsoft/go-mssqldb v1.9.8 github.com/microsoft/go-mssqldb v1.9.8
github.com/sirupsen/logrus v1.9.4 github.com/sirupsen/logrus v1.9.4
github.com/twpayne/go-geom v1.6.1 github.com/twpayne/go-geom v1.6.1
@@ -15,15 +16,20 @@ require (
) )
require ( require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
github.com/BurntSushi/toml v1.6.0 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/kr/text v0.2.0 // indirect github.com/joho/godotenv v1.5.1 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect
golang.org/x/crypto v0.48.0 // indirect golang.org/x/crypto v0.48.0 // indirect
golang.org/x/net v0.51.0 // indirect
golang.org/x/sys v0.41.0 // indirect golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect golang.org/x/text v0.34.0 // indirect
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect
) )

16
go.sum
View File

@@ -4,19 +4,25 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 h1:Hk5QBxZQC1jb2Fwj6mpz
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1/go.mod h1:IYus9qsFobWIc2YVwe/WPjcnyCkPKtnHAqUYeebc8z0= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1/go.mod h1:IYus9qsFobWIc2YVwe/WPjcnyCkPKtnHAqUYeebc8z0=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA= github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI= github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.8.1 h1:/Zt+cDPnpC3OVDm/JKLOs7M2DKmLRIIp3XIx9pHHiig=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.8.1/go.mod h1:Ng3urmn6dYe8gnbCMoHHVl5APYz2txho3koEkV2o2HA=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.4.0 h1:E4MgwLBGeVB5f2MdcIVD3ELVAWpr+WD6MUe1i+tM/PA= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.4.0 h1:E4MgwLBGeVB5f2MdcIVD3ELVAWpr+WD6MUe1i+tM/PA=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.4.0/go.mod h1:Y2b/1clN4zsAoUd/pgNAQHjLDnTis/6ROkUfyob6psM= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.4.0/go.mod h1:Y2b/1clN4zsAoUd/pgNAQHjLDnTis/6ROkUfyob6psM=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0 h1:nCYfgcSyHZXJI8J0IWE5MsCGlb2xp9fJiXyxWgmOFg4= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0 h1:nCYfgcSyHZXJI8J0IWE5MsCGlb2xp9fJiXyxWgmOFg4=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0/go.mod h1:ucUjca2JtSZboY8IoUqyQyuuXvwbMBVwFOm0vdQPNhA= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0/go.mod h1:ucUjca2JtSZboY8IoUqyQyuuXvwbMBVwFOm0vdQPNhA=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 h1:jWQK1GI+LeGGUKBADtcH2rRqPxYB1Ljwms5gFA2LqrM=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4/go.mod h1:8mwH4klAm9DUgR2EEHyEEAQlRDvLPyg5fQry3y+cDew=
github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgvJqCH0sFfrBUTnUJSBrBf7++ypk+twtRs= github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgvJqCH0sFfrBUTnUJSBrBf7++ypk+twtRs=
github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk= github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk=
github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY=
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -32,6 +38,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/ilyakaznacheev/cleanenv v1.5.0 h1:0VNZXggJE2OYdXE87bfSSwGxeiGt9moSR2lOrsHHvr4=
github.com/ilyakaznacheev/cleanenv v1.5.0/go.mod h1:a5aDzaJrLCQZsazHol1w8InnDcOX0OColm64SlIi6gk=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -42,8 +50,8 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
@@ -83,3 +91,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 h1:slmdOY3vp8a7KQbHkL+FLbvbkgMqmXojpFUO/jENuqQ=
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3/go.mod h1:oVgVk4OWVDi43qWBEyGhXgYxt7+ED4iYNpTngSLX2Iw=

View File

@@ -0,0 +1,87 @@
package azure
import (
"context"
"errors"
"fmt"
"net/url"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)
var (
ErrInvalidConnectionString = errors.New("invalid connection string")
ErrContainerNotFound = errors.New("container not found")
ErrBlobNotFound = errors.New("blob not found")
ErrInvalidInput = errors.New("invalid input parameters")
)
type Client struct {
client *azblob.Client
azureStorageConfig config.AzureStorageConfig
}
func NewClient(azureStorageConfig config.AzureStorageConfig) (*Client, error) {
protocol := "https"
if !azureStorageConfig.UseHTTPS {
protocol = "http"
}
blobEndpoint, _ := url.JoinPath(azureStorageConfig.ServiceURL, azureStorageConfig.AccountName)
connStr := fmt.Sprintf("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;",
protocol, azureStorageConfig.AccountName, azureStorageConfig.AccountKey, blobEndpoint)
client, err := azblob.NewClientFromConnectionString(connStr, nil)
if err != nil {
return nil, fmt.Errorf("creating azure storage client: %w", err)
}
return &Client{
client: client,
azureStorageConfig: azureStorageConfig,
}, nil
}
func (c *Client) CreateContainer(ctx context.Context, containerName string) error {
if containerName == "" {
return ErrInvalidInput
}
_, err := c.client.CreateContainer(ctx, containerName, nil)
if err != nil {
return fmt.Errorf("creating container %s: %w", containerName, err)
}
return nil
}
func (c *Client) UploadBuffer(ctx context.Context, containerName, blobPath string, buffer []byte) error {
if containerName == "" || blobPath == "" || buffer == nil {
return ErrInvalidInput
}
_, err := c.client.UploadBuffer(ctx, containerName, blobPath, buffer, nil)
if err != nil {
return fmt.Errorf("uploading blob %s: %w", blobPath, err)
}
return nil
}
func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []byte) (string, error) {
if blobPath == "" || buffer == nil {
return "", ErrInvalidInput
}
fullPath := blobPath
if c.azureStorageConfig.Prefix != "" {
fullPath, _ = url.JoinPath(c.azureStorageConfig.Prefix, blobPath)
}
if err := c.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer); err != nil {
return "", err
}
blobEndpoint, _ := url.JoinPath(c.azureStorageConfig.ServiceURL, c.azureStorageConfig.AccountName)
blobURL, _ := url.JoinPath(blobEndpoint, c.azureStorageConfig.Container, fullPath)
return blobURL, nil
}

View File

@@ -1,41 +1,41 @@
package config package config
import ( import (
"os" "github.com/ilyakaznacheev/cleanenv"
"github.com/joho/godotenv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type appConfig struct { type AzureStorageConfig struct {
SourceDbUrl string AccountName string `env:"AZ_ACCOUNT_NAME"`
TargetDbUrl string Container string `env:"AZ_CONTAINER"`
AccountKey string `env:"AZ_ACCOUNT_KEY"`
UseHTTPS bool `env:"AZ_USE_HTTPS" env-default:"true"`
ServiceURL string `env:"AZ_SERVICE_URL"`
Prefix string `env:"AZ_PREFIX"`
Enabled bool `env:"AZ_STORAGE_ENABLED"`
} }
func loadEnv() { type appConfig struct {
err := godotenv.Load() SourceDbUrl string `env:"SOURCE_DB_URL" env-required:"true"`
if err != nil { TargetDbUrl string `env:"TARGET_DB_URL" env-required:"true"`
log.Warn("Warning: could not load .env file") LogLevel string `env:"LOG_LEVEL" env-default:"INFO"`
} AzureStorage AzureStorageConfig
} }
func getAppConfig() appConfig { func getAppConfig() appConfig {
loadEnv() var cfg appConfig
sourceDbUrl := os.Getenv("SOURCE_DB_URL") err := cleanenv.ReadConfig(".env", &cfg)
if sourceDbUrl == "" { if err != nil {
log.Fatal("SOURCE_DB_URL environment variable not set") log.Warn("Could not load .env file")
} }
targetDbUrl := os.Getenv("TARGET_DB_URL") err = cleanenv.ReadEnv(&cfg)
if targetDbUrl == "" { if err != nil {
log.Fatal("TARGET_DB_URL environment variable not set") log.Fatalf("Error al cargar variables: %v", err)
} }
return appConfig{ return cfg
SourceDbUrl: sourceDbUrl,
TargetDbUrl: targetDbUrl,
}
} }
var App appConfig = getAppConfig() var App appConfig = getAppConfig()

View File

@@ -14,6 +14,16 @@ type RetryConfig struct {
MaxJitterMs int `yaml:"max_jitter_ms"` MaxJitterMs int `yaml:"max_jitter_ms"`
} }
type ToStorageColumnConfig struct {
Source string `yaml:"source"`
Target string `yaml:"target"`
Mode string `yaml:"mode"`
}
type ToStorageConfig struct {
Columns []ToStorageColumnConfig `yaml:"columns"`
}
type JobConfig struct { type JobConfig struct {
MaxExtractors int `yaml:"max_extractors"` MaxExtractors int `yaml:"max_extractors"`
MaxLoaders int `yaml:"max_loaders"` MaxLoaders int `yaml:"max_loaders"`
@@ -26,6 +36,7 @@ type JobConfig struct {
MaxChunkErrors int `yaml:"max_chunk_errors"` MaxChunkErrors int `yaml:"max_chunk_errors"`
Retry RetryConfig `yaml:"retry"` Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64 RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
} }
type TableInfo struct { type TableInfo struct {
@@ -33,23 +44,31 @@ type TableInfo struct {
Table string `yaml:"table"` Table string `yaml:"table"`
} }
type TargetTableInfo struct {
TableInfo `yaml:",inline"`
}
type SourceTableInfo struct { type SourceTableInfo struct {
TableInfo `yaml:",inline"` TableInfo `yaml:",inline"`
PrimaryKey string `yaml:"primary_key"` PrimaryKey string `yaml:"primary_key"`
} }
type TargetTableInfo struct {
TableInfo `yaml:",inline"`
PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"`
}
type RangeConfig struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
type Job struct { type Job struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Enabled bool `yaml:"enabled"` Enabled bool `yaml:"enabled"`
SourceTable SourceTableInfo `yaml:"source"` SourceTable SourceTableInfo `yaml:"source"`
TargetTable TargetTableInfo `yaml:"target"` TargetTable TargetTableInfo `yaml:"target"`
PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"` JobConfig `yaml:",inline"`
Range RangeConfig `yaml:"range"`
} }
type MigrationConfig struct { type MigrationConfig struct {
@@ -76,6 +95,8 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.MaxParallelWorkers = raw.MaxParallelWorkers c.MaxParallelWorkers = raw.MaxParallelWorkers
c.Defaults = raw.Defaults c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition) c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs { for _, node := range raw.Jobs {

View File

@@ -103,8 +103,8 @@ func ExtractorErrorHandler(
if err.HasLastId { if err.HasLastId {
newPartition.ParentId = err.Partition.Id newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New() newPartition.Id = uuid.New()
newPartition.LowerLimit = err.LastId newPartition.Range.Min = err.LastId
newPartition.IsLowerLimitInclusive = false newPartition.Range.IsMinInclusive = false
} }
requeueWithBackoff(ctx, delay, func() { requeueWithBackoff(ctx, delay, func() {

View File

@@ -0,0 +1,19 @@
package dbwrapper
import "fmt"
type Factory func() DbWrapper
var drivers = make(map[string]Factory)
func Register(name string, factory Factory) {
drivers[name] = factory
}
func New(driverType string) (DbWrapper, error) {
factory, ok := drivers[driverType]
if !ok {
return nil, fmt.Errorf("driver not yet supported: %s", driverType)
}
return factory(), nil
}

View File

@@ -0,0 +1,176 @@
package dbwrapper
import (
"context"
"database/sql"
"fmt"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
})
}
type mssqlRowResult struct {
row *sql.Row
}
func (mr *mssqlRowResult) Scan(dest ...any) error {
return mr.row.Scan(dest...)
}
type mssqlRowsResult struct {
columns []string
rows *sql.Rows
}
func (mr *mssqlRowsResult) Close() error {
return mr.rows.Close()
}
func (mr *mssqlRowsResult) Columns() ([]string, error) {
if mr.columns != nil {
return mr.columns, nil
}
return mr.rows.Columns()
}
func (mr *mssqlRowsResult) Err() error {
return mr.rows.Err()
}
func (mr *mssqlRowsResult) Next() bool {
return mr.rows.Next()
}
func (mr *mssqlRowsResult) Scan(dest ...any) error {
return mr.rows.Scan(dest...)
}
func (mr *mssqlRowsResult) Values() ([]any, error) {
columns, err := mr.Columns()
if err != nil {
return nil, err
}
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := mr.rows.Scan(scanArgs...); err != nil {
return nil, err
}
return rowValues, nil
}
type mssqlDbWrapper struct {
db *sql.DB
dialect string
}
func (mw *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error {
db, err := sql.Open("sqlserver", dbUrl)
if err != nil {
return err
}
if err := db.PingContext(ctx); err != nil {
if err := db.Close(); err != nil {
return err
}
return err
}
mw.db = db
return nil
}
func (mw *mssqlDbWrapper) Close() error {
return mw.db.Close()
}
func (mw *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, execErr := mw.db.ExecContext(ctx, query, args...)
if execErr != nil {
return ExecResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: affectedRows}, nil
}
func (mw *mssqlDbWrapper) GetDialect() string {
return mw.dialect
}
func (mw *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := mw.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &mssqlRowsResult{columns: nil, rows: rows}, nil
}
func (mw *mssqlDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := mw.db.QueryRowContext(ctx, query, args...)
return &mssqlRowResult{row: row}
}
func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
tx, err := mw.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
fullTableName := fmt.Sprintf("[%s].[%s]", schema, table)
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, columnNames...))
if err != nil {
tx.Rollback()
return 0, err
}
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
if err := stmt.Close(); err != nil {
tx.Rollback()
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
rowsAffected, raErr := result.RowsAffected()
if raErr != nil {
return 0, nil
}
return rowsAffected, nil
}

View File

@@ -0,0 +1,128 @@
package dbwrapper
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
})
}
type postgresRowResult struct {
row pgx.Row
}
func (pr *postgresRowResult) Scan(dest ...any) error {
return pr.row.Scan(dest...)
}
type postgresRowsResult struct {
columns []string
rows pgx.Rows
}
func (pr *postgresRowsResult) Close() error {
pr.rows.Close()
return nil
}
func (pr *postgresRowsResult) Columns() ([]string, error) {
if pr.columns != nil {
return pr.columns, nil
}
rawColumns := pr.rows.FieldDescriptions()
if rawColumns == nil {
return nil, errors.New("error retrieving columns")
}
columns := make([]string, 0, len(rawColumns))
for _, rc := range rawColumns {
columns = append(columns, rc.Name)
}
return columns, nil
}
func (pr *postgresRowsResult) Err() error {
return pr.rows.Err()
}
func (pr *postgresRowsResult) Next() bool {
return pr.rows.Next()
}
func (pr *postgresRowsResult) Scan(dest ...any) error {
return pr.rows.Scan(dest...)
}
func (pr *postgresRowsResult) Values() ([]any, error) {
return pr.rows.Values()
}
type postgresDbWrapper struct {
db *pgxpool.Pool
dialect string
}
func (pw *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error {
pool, err := pgxpool.New(ctx, dbUrl)
if err != nil {
return err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return err
}
pw.db = pool
return nil
}
func (pw *postgresDbWrapper) Close() error {
pw.db.Close()
return nil
}
func (pw *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, err := pw.db.Exec(ctx, query, args...)
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: result.RowsAffected()}, nil
}
func (pw *postgresDbWrapper) GetDialect() string {
return pw.dialect
}
func (pw *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := pw.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRowsResult{columns: nil, rows: rows}, nil
}
func (pw *postgresDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := pw.db.QueryRow(ctx, query, args...)
return &postgresRowResult{row: row}
}
func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
affectedRows, err := pw.db.CopyFrom(ctx, pgx.Identifier{schema, table}, columnNames, pgx.CopyFromRows(rows))
if err != nil {
return 0, err
}
return affectedRows, nil
}

View File

@@ -0,0 +1,35 @@
package dbwrapper
import (
"context"
"errors"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
type ExecResult struct {
AffectedRows int64
}
type RowsResult interface {
Close() error
Columns() ([]string, error)
Err() error
Next() bool
Scan(dest ...any) error
Values() ([]any, error)
}
type RowResult interface {
Scan(dest ...any) error
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
Exec(ctx context.Context, query string, args ...any) (ExecResult, error)
GetDialect() string
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
}

View File

@@ -1,30 +0,0 @@
package db
import (
"context"
"database/sql"
)
type MssqlDbWrapper struct {
db *sql.DB
}
func NewMssqlDbWrapper(db *sql.DB) DbWrapper {
return &MssqlDbWrapper{db: db}
}
func (wrapper *MssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, execErr := wrapper.db.ExecContext(ctx, query, args...)
if execErr != nil {
return DbWrapperResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: affectedRows,
}, nil
}

View File

@@ -1,47 +0,0 @@
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
type PostgresDbWrapper struct {
db *pgxpool.Pool
}
func NewPostgresDbWrapper(db *pgxpool.Pool) DbWrapper {
return &PostgresDbWrapper{db: db}
}
func (wrapper *PostgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, err := wrapper.db.Exec(ctx, query, args...)
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: result.RowsAffected(),
}, nil
}

View File

@@ -1,11 +0,0 @@
package db
import "context"
type DbWrapperResult struct {
AffectedRows int64
}
type DbWrapper interface {
Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error)
}

View File

@@ -0,0 +1,101 @@
package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
func Consume(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := extractor.Exec(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
}
continue
}
wgActivePartitions.Done()
}
}
}

View File

@@ -5,24 +5,22 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"slices"
"strings" "strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlExtractor struct { type MssqlExtractor struct {
db *sql.DB db dbwrapper.DbWrapper
} }
func NewMssqlExtractor(db *sql.DB) etl.Extractor { func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &MssqlExtractor{db: db} return &MssqlExtractor{db: db}
} }
@@ -31,6 +29,9 @@ func buildExtractQueryMssql(
columns []models.ColumnType, columns []models.ColumnType,
includeRange bool, includeRange bool,
isMinInclusive bool, isMinInclusive bool,
isMaxInclusive bool,
hasMin bool,
hasMax bool,
) string { ) string {
var sbQuery strings.Builder var sbQuery strings.Builder
@@ -54,15 +55,32 @@ func buildExtractQueryMssql(
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table) fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table)
if includeRange { if includeRange && (hasMin || hasMax) {
fmt.Fprintf(&sbQuery, " WHERE [%s]", tableInfo.PrimaryKey) sbQuery.WriteString(" WHERE ")
if isMinInclusive {
sbQuery.WriteString(" >=") if hasMin {
} else { fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
sbQuery.WriteString(" >") if isMinInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
sbQuery.WriteString(" @min")
} }
fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", tableInfo.PrimaryKey) if hasMin && hasMax {
sbQuery.WriteString(" AND ")
}
if hasMax {
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
if isMaxInclusive {
sbQuery.WriteString(" <=")
} else {
sbQuery.WriteString(" <")
}
sbQuery.WriteString(" @max")
}
} }
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey) fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey)
@@ -73,14 +91,14 @@ func buildExtractQueryMssql(
func errorFromLastRow( func errorFromLastRow(
lastRow models.UnknownRowValues, lastRow models.UnknownRowValues,
indexPrimaryKey int, indexPrimaryKey int,
partition *models.Partition, partition models.Partition,
previousError error, previousError error,
) *custom_errors.ExtractorError { ) *custom_errors.ExtractorError {
lastIdRawValue := lastRow[indexPrimaryKey] lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue) lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok { if !ok {
currentPartition := *partition currentPartition := partition
currentPartition.RetryCounter = 3 currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{ return &custom_errors.ExtractorError{
Partition: currentPartition, Partition: currentPartition,
@@ -91,14 +109,14 @@ func errorFromLastRow(
} }
return &custom_errors.ExtractorError{ return &custom_errors.ExtractorError{
Partition: *partition, Partition: partition,
HasLastId: true, HasLastId: true,
LastId: lastId, LastId: lastId,
Msg: previousError.Error(), Msg: previousError.Error(),
} }
} }
func (mssqlEx *MssqlExtractor) ProcessPartition( func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -106,21 +124,23 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, ) (int, error) {
) error { hasMin := partition.HasRange && partition.Range.Min > 0
query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive) hasMax := partition.HasRange && partition.Range.Max > 0
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
var queryArgs []any var queryArgs []any
if partition.ShouldUseRange { if hasMin {
queryArgs = append(queryArgs, queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min))
sql.Named("min", partition.LowerLimit), }
sql.Named("max", partition.UpperLimit), if hasMax {
) queryArgs = append(queryArgs, sql.Named("max", partition.Range.Max))
} }
rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...) rowsRead := 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil { if err != nil {
return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
defer rows.Close() defer rows.Close()
@@ -136,7 +156,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
if err := rows.Scan(scanArgs...); err != nil { if err := rows.Scan(scanArgs...); err != nil {
if len(batchRows) == 0 { if len(batchRows) == 0 {
return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
lastRow := batchRows[len(batchRows)-1] lastRow := batchRows[len(batchRows)-1]
@@ -144,134 +164,46 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
select { select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return rowsRead, ctx.Err()
} }
atomic.AddInt64(rowsRead, int64(len(batchRows))) return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err)
} }
rowsRead++
batchRows = append(batchRows, rowValues) batchRows = append(batchRows, rowValues)
if len(batchRows) >= batchSize { if len(batchRows) >= batchSize {
select { select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return rowsRead, ctx.Err()
} }
atomic.AddInt64(rowsRead, int64(len(batchRows)))
batchRows = make([]models.UnknownRowValues, 0, batchSize) batchRows = make([]models.UnknownRowValues, 0, batchSize)
} }
} }
if err := rows.Err(); err != nil { if err := rows.Err(); err != nil {
if errors.Is(err, ctx.Err()) { if errors.Is(err, ctx.Err()) {
return ctx.Err() return rowsRead, ctx.Err()
} }
if len(batchRows) == 0 { if len(batchRows) > 0 {
return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
} }
lastRow := batchRows[len(batchRows)-1] return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err)
} }
if len(batchRows) > 0 { if len(batchRows) > 0 {
select { select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return rowsRead, ctx.Err()
} }
atomic.AddInt64(rowsRead, int64(len(batchRows)))
} }
return nil return rowsRead, nil
}
func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
err := mssqlEx.ProcessPartition(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
rowsRead,
)
if err != nil {
var exError *custom_errors.ExtractorError
var jobError *custom_errors.JobError
if errors.As(err, &exError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
}
continue
}
wgActivePartitions.Done()
}
}
} }

View File

@@ -2,29 +2,34 @@ package extractors
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresExtractor struct { type PostgresExtractor struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor { func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &PostgresExtractor{db: pool} return &PostgresExtractor{db: db}
} }
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string { func buildExtractQueryPostgres(
sourceDbInfo config.SourceTableInfo,
columns []models.ColumnType,
includeRange bool,
isMinInclusive bool,
isMaxInclusive bool,
hasMin bool,
hasMax bool,
) string {
var sbColumns strings.Builder var sbColumns strings.Builder
if len(columns) == 0 { if len(columns) == 0 {
@@ -49,10 +54,44 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
} }
} }
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table)
if includeRange && (hasMin || hasMax) {
query += " WHERE "
paramIdx := 1
if hasMin {
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
if isMinInclusive {
query += " >="
} else {
query += " >"
}
query += fmt.Sprintf(" $%d", paramIdx)
paramIdx++
}
if hasMin && hasMax {
query += " AND "
}
if hasMax {
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
if isMaxInclusive {
query += " <="
} else {
query += " <"
}
query += fmt.Sprintf(" $%d", paramIdx)
}
}
query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey)
return query
} }
func (postgresEx *PostgresExtractor) ProcessPartition( func (postgresEx *PostgresExtractor) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -60,17 +99,23 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, ) (int, error) {
) error { hasMin := partition.HasRange && partition.Range.Min > 0
query := buildExtractQueryPostgres(tableInfo, columns) hasMax := partition.HasRange && partition.Range.Max > 0
query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
if partition.ShouldUseRange { var queryArgs []any
return errors.New("Batch config not yet supported") if hasMin {
queryArgs = append(queryArgs, partition.Range.Min)
}
if hasMax {
queryArgs = append(queryArgs, partition.Range.Max)
} }
rows, err := postgresEx.db.Query(ctx, query) rowsRead := 0
rows, err := postgresEx.db.Query(ctx, query, queryArgs...)
if err != nil { if err != nil {
return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
defer rows.Close() defer rows.Close()
@@ -79,8 +124,9 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
for rows.Next() { for rows.Next() {
values, err := rows.Values() values, err := rows.Values()
if err != nil { if err != nil {
return errors.New("Unexpected error reading rows from source") return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
rowsRead++
batchRows = append(batchRows, values) batchRows = append(batchRows, values)
@@ -88,41 +134,24 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
select { select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return rowsRead, ctx.Err()
} }
atomic.AddInt64(rowsRead, int64(len(batchRows)))
batchRows = make([]models.UnknownRowValues, 0, batchSize) batchRows = make([]models.UnknownRowValues, 0, batchSize)
} }
} }
if err := rows.Err(); err != nil { if err := rows.Err(); err != nil {
return errors.New("Unexpected error reading rows from source") return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
if len(batchRows) > 0 { if len(batchRows) > 0 {
select { select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return rowsRead, nil
} }
atomic.AddInt64(rowsRead, int64(len(batchRows)))
} }
return nil return rowsRead, nil
}
func (postgresEx *PostgresExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
} }

View File

@@ -1 +0,0 @@
package extractors

View File

@@ -9,43 +9,32 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresLoader struct { type GenericLoader struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresLoader(pool *pgxpool.Pool) etl.Loader { func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: pool} return &GenericLoader{db: db}
} }
func mapSlice[T any, V any](input []T, mapper func(T) V) []V { func (gl *GenericLoader) ProcessBatch(
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} _, err := gl.db.SaveMassive(
_, err := postgresLd.db.CopyFrom(
ctx, ctx,
tableId, tableInfo.Schema,
tableInfo.Table,
colNames, colNames,
pgx.CopyFromRows(batch.Rows), batch.Rows,
) )
if err != nil { if err != nil {
@@ -54,7 +43,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
if pgErr.Code == "23505" { if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{ return 0, &custom_errors.JobError{
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s", tableId.Sanitize()), Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Prev: err, Prev: err,
} }
} }
@@ -66,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
return len(batch.Rows), nil return len(batch.Rows), nil
} }
func (postgresLd *PostgresLoader) Exec( func (gl *GenericLoader) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -93,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
return return
} }
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch) processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil { if err != nil {
var ldError *custom_errors.LoaderError var ldError *custom_errors.LoaderError

View File

@@ -1 +0,0 @@
package loaders

View File

@@ -0,0 +1,11 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -15,7 +15,22 @@ func PartitionRangeGenerator(
tableInfo config.TableInfo, tableInfo config.TableInfo,
partitionColumn string, partitionColumn string,
rowsPerPartition int64, rowsPerPartition int64,
jobRange config.RangeConfig,
) ([]models.Partition, error) { ) ([]models.Partition, error) {
if jobRange.Min > 0 {
return []models.Partition{{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
Min: jobRange.Min,
Max: jobRange.Max,
IsMinInclusive: jobRange.IsMinInclusive,
IsMaxInclusive: jobRange.IsMaxInclusive,
},
}}, nil
}
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo) rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -23,9 +38,9 @@ func PartitionRangeGenerator(
if rowsCount <= rowsPerPartition { if rowsCount <= rowsPerPartition {
return []models.Partition{{ return []models.Partition{{
Id: uuid.New(), Id: uuid.New(),
ShouldUseRange: false, HasRange: false,
RetryCounter: 0, RetryCounter: 0,
}}, nil }}, nil
} }

View File

@@ -8,16 +8,17 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlTableAnalyzer struct { type MssqlTableAnalyzer struct {
db *sql.DB db dbwrapper.DbWrapper
} }
func NewMssqlTableAnalyzer(db *sql.DB) etl.TableAnalyzer { func NewMssqlTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db} return &MssqlTableAnalyzer{db: db}
} }
@@ -35,7 +36,7 @@ JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND c.name NOT LIKE 'graph_id%' WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;` ORDER BY c.column_id;`
type rawColumnMssql struct { type rawColumnMssql struct {
@@ -142,7 +143,7 @@ func (ta *MssqlTableAnalyzer) QueryColumnTypes(
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second) localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
rows, err := ta.db.QueryContext(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)) rows, err := ta.db.Query(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -183,11 +184,11 @@ JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1) WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name` GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
var rowsCount int64 var rowsCount int64
err := ta.db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) err := ta.db.QueryRow(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -215,10 +216,10 @@ ORDER BY batch_id`,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table) tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
rows, err := ta.db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -228,13 +229,15 @@ ORDER BY batch_id`,
for rows.Next() { for rows.Next() {
partition := models.Partition{ partition := models.Partition{
Id: uuid.New(), Id: uuid.New(),
ShouldUseRange: true, HasRange: true,
RetryCounter: 0, RetryCounter: 0,
IsLowerLimitInclusive: true, Range: models.PartitionRange{
IsMinInclusive: true,
},
} }
if err := rows.Scan(&partition.LowerLimit, &partition.UpperLimit); err != nil { if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
return nil, err return nil, err
} }

View File

@@ -6,16 +6,16 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresTableAnalyzer struct { type PostgresTableAnalyzer struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresTableAnalyzer(db *pgxpool.Pool) etl.TableAnalyzer { func NewPostgresTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &PostgresTableAnalyzer{db: db} return &PostgresTableAnalyzer{db: db}
} }
@@ -125,7 +125,7 @@ func (ta *PostgresTableAnalyzer) QueryColumnTypes(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,
) ([]models.ColumnType, error) { ) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second) localCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table) rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table)

View File

@@ -3,18 +3,32 @@ package transformers
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"strings"
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
) )
type MssqlTransformer struct{} type MssqlTransformer struct {
toStorage config.ToStorageConfig
sourceTable config.SourceTableInfo
azureClient *azure.Client
}
func NewMssqlTransformer() etl.Transformer { func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.SourceTableInfo, azureClient *azure.Client) etl.Transformer {
return &MssqlTransformer{} return &MssqlTransformer{
toStorage: toStorage,
sourceTable: sourceTable,
azureClient: azureClient,
}
} }
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
@@ -60,6 +74,65 @@ func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransfor
return plan return plan
} }
func computeStorageTransformationPlan(
ctx context.Context,
azureClient *azure.Client,
toStorage config.ToStorageConfig,
sourceColumns []models.ColumnType,
sourceTable config.SourceTableInfo,
) []etl.ColumnTransformPlan {
if azureClient == nil || len(toStorage.Columns) == 0 {
return nil
}
colIndex := make(map[string]int, len(sourceColumns))
for i, col := range sourceColumns {
colIndex[strings.ToUpper(col.Name())] = i
}
var plan []etl.ColumnTransformPlan
for _, storageCol := range toStorage.Columns {
if storageCol.Mode != "REFERENCE_ONLY" {
log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
continue
}
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
if !ok {
log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
continue
}
sourceColName := storageCol.Source
schema := sourceTable.Schema
table := sourceTable.Table
plan = append(plan, etl.ColumnTransformPlan{
Index: idx,
Fn: func(v any) (any, error) {
if v == nil {
return nil, nil
}
b, ok := v.([]byte)
if !ok {
log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
schema, table, sourceColName, v)
return v, nil
}
start := time.Now()
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
if err != nil {
return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err)
}
log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
return blobURL, nil
},
})
}
return plan
}
const processBatchCtxCheck = 4096 const processBatchCtxCheck = 4096
func (mssqlTr *MssqlTransformer) ProcessBatch( func (mssqlTr *MssqlTransformer) ProcessBatch(
@@ -100,6 +173,8 @@ func (mssqlTr *MssqlTransformer) Exec(
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
) { ) {
transformationPlan := computeTransformationPlan(columns) transformationPlan := computeTransformationPlan(columns)
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...)
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {

View File

@@ -10,7 +10,7 @@ import (
) )
type Extractor interface { type Extractor interface {
ProcessPartition( Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -18,21 +18,7 @@ type Extractor interface {
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, ) (int, error)
) error
Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
)
} }
type TransformerFunc func(any) (any, error) type TransformerFunc func(any) (any, error)

View File

@@ -1,6 +1,10 @@
package models package models
import "github.com/google/uuid" import (
"time"
"github.com/google/uuid"
)
type UnknownRowValues = []any type UnknownRowValues = []any
@@ -11,12 +15,27 @@ type Batch struct {
RetryCounter int RetryCounter int
} }
type Partition struct { type PartitionRange struct {
Id uuid.UUID Min int64
ParentId uuid.UUID Max int64
LowerLimit int64 IsMinInclusive bool
UpperLimit int64 IsMaxInclusive bool
IsLowerLimitInclusive bool }
ShouldUseRange bool
RetryCounter int type Partition struct {
Id uuid.UUID
ParentId uuid.UUID
Range PartitionRange
HasRange bool
RetryCounter int
}
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
} }

44
scripts/az-blob/main.go Normal file
View File

@@ -0,0 +1,44 @@
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
)
func main() {
cfg := config.App.AzureStorage
containerName := cfg.Container
client, err := azure.NewClient(cfg)
if err != nil {
log.Fatalf("Error creando cliente: %v", err)
}
ctx := context.Background()
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
blobName := fmt.Sprintf("%sarchivo-%d.txt", cfg.Prefix, id)
content := fmt.Sprintf("Contenido aleatorio: %d", rand.Intn(100000))
err := client.UploadBuffer(ctx, containerName, blobName, []byte(content))
if err != nil {
log.Printf("Fallo al subir %s: %v", blobName, err)
} else {
fmt.Printf("Subido exitosamente: %s\n", blobName)
}
}(i)
}
wg.Wait()
}

View File

@@ -8,13 +8,32 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
func main() { func main() {
log.SetFormatter(&log.TextFormatter{ log.SetFormatter(&log.TextFormatter{
FullTimestamp: true, FullTimestamp: true,
@@ -27,8 +46,8 @@ func main() {
ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
sourcePool, err := db.Connect(ctxSource, config.App.SourceDbUrl) sourcePool, err := Connect(ctxSource, config.App.SourceDbUrl)
defer db.Close(sourcePool) defer Close(sourcePool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -37,8 +56,8 @@ func main() {
ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
targetPool, err := db.Connect(ctxTarget, config.App.TargetDbUrl) targetPool, err := Connect(ctxTarget, config.App.TargetDbUrl)
defer db.Close(targetPool) defer Close(targetPool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }