Compare commits
9 Commits
04b799ce08
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
86258718d8
|
|||
|
13cd02a824
|
|||
|
f844004942
|
|||
|
4ba26092a9
|
|||
|
537b7fbd28
|
|||
|
d534314cff
|
|||
|
b386965bb8
|
|||
|
961fa48025
|
|||
|
837fdc7abb
|
@@ -1,24 +0,0 @@
|
||||
# Skill Registry — go-migrate
|
||||
|
||||
Generated: 2026-04-21
|
||||
|
||||
## Compact Rules
|
||||
|
||||
### Go conventions
|
||||
- Use existing error wrapping pattern: `fmt.Errorf("context: %w", err)`
|
||||
- Channel-based pipeline — keep goroutine lifecycle clean (close channels in correct order)
|
||||
- No comments unless non-obvious WHY; no docstrings
|
||||
- Prefer named returns only when it aids clarity in short functions
|
||||
- Use `strings.EqualFold` for case-insensitive column name comparison
|
||||
|
||||
### Project conventions
|
||||
- Config structs live in `internal/app/config/`
|
||||
- ETL interfaces live in `internal/app/etl/types.go`
|
||||
- Transformer implementations in `internal/app/etl/transformers/`
|
||||
- Azure operations via `internal/app/azure/main.go`
|
||||
- Per-job transformer creation (not shared) when job has storage config
|
||||
|
||||
## User Skills
|
||||
| Trigger | Skill |
|
||||
|---------|-------|
|
||||
| sdd-* | SDD workflow skills |
|
||||
17
.env.example
17
.env.example
@@ -1,6 +1,23 @@
|
||||
SOURCE_DB_URL=sqlserver://sa:password@localhost:1433?database=master&packet+size=32767&loc=UTC&dial+timeout=120&connection+timeout=120&KeepAlive=30
|
||||
|
||||
# used only when SOURCE_DB_URL is not set
|
||||
# SOURCE_DB_HOST=localhost
|
||||
# SOURCE_DB_PORT=1433
|
||||
# SOURCE_DB_NAME=master
|
||||
# SOURCE_DB_USER=sa
|
||||
# SOURCE_DB_PWD=secure_password!123
|
||||
# SOURCE_DB_OPTIONS="packet+size=32767&loc=UTC&dial+timeout=120&connection+timeout=120&KeepAlive=30"
|
||||
|
||||
TARGET_DB_URL=postgresql://postgres:password@localhost:5432/db
|
||||
|
||||
# used only when TARGET_DB_URL is not set
|
||||
# TARGET_DB_HOST=localhost
|
||||
# TARGET_DB_PORT=5432
|
||||
# TARGET_DB_NAME=db
|
||||
# TARGET_DB_USER=postgres
|
||||
# TARGET_DB_PWD=secure_password!123
|
||||
# TARGET_DB_OPTIONS=""
|
||||
|
||||
LOG_LEVEL=INFO
|
||||
|
||||
AZ_STORAGE_ENABLED=false
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -29,3 +29,4 @@ go.work.sum
|
||||
# .idea/
|
||||
.vscode/
|
||||
.temp
|
||||
.atl
|
||||
|
||||
92
README.md
Normal file
92
README.md
Normal file
@@ -0,0 +1,92 @@
|
||||
# go-migrate
|
||||
|
||||
Migrador de datos entre SQL Server y PostgreSQL con procesamiento en paralelo.
|
||||
|
||||
## Compilar
|
||||
|
||||
```bash
|
||||
go build -o go-migrate ./cmd/go_migrate
|
||||
```
|
||||
|
||||
## Uso
|
||||
|
||||
```bash
|
||||
./go-migrate [opciones] [<ruta-config>]
|
||||
```
|
||||
|
||||
### Opciones
|
||||
|
||||
| Flag | Descripción |
|
||||
|------|-------------|
|
||||
| `-config <path>` | Ruta al archivo de configuración YAML. También se puede pasar como argumento posicional. Si no se indica, se busca `config.yaml`. |
|
||||
| `-validate` | Compara la cantidad de filas entre origen y destino por cada job. No migra datos. |
|
||||
| `-dry-run` | Valida conexiones, acceso a storage (si aplica) y cuenta filas en origen sin migrar. |
|
||||
|
||||
### Ejemplos
|
||||
|
||||
```bash
|
||||
# Migrar con config.yaml por defecto
|
||||
./go-migrate
|
||||
|
||||
# Usar un archivo de configuración específico
|
||||
./go-migrate -config produccion.yaml
|
||||
|
||||
# Validar que origen y destino tengan la misma cantidad de filas
|
||||
./go-migrate -validate -config produccion.yaml
|
||||
|
||||
# Verificar conectividad sin migrar
|
||||
./go-migrate -dry-run -config produccion.yaml
|
||||
```
|
||||
|
||||
## Configuración
|
||||
|
||||
La herramienta lee credenciales y parámetros desde variables de entorno o un archivo `.env`.
|
||||
|
||||
### Variables clave
|
||||
|
||||
| Variable | Descripción |
|
||||
|----------|-------------|
|
||||
| `SOURCE_DB_URL` | URL de conexión a la base de datos origen (o `SOURCE_DB_HOST`, `SOURCE_DB_NAME`, `SOURCE_DB_USER`, `SOURCE_DB_PWD`). |
|
||||
| `TARGET_DB_URL` | URL de conexión a la base de datos destino (o `TARGET_DB_HOST`, `TARGET_DB_NAME`, `TARGET_DB_USER`, `TARGET_DB_PWD`). |
|
||||
| `LOG_LEVEL` | Nivel de log: `DEBUG`, `INFO`, `WARN`, `ERROR` (por defecto: `INFO`). |
|
||||
|
||||
Para migrar datos binarios a Azure Blob, también se requieren `AZ_STORAGE_ENABLED`, `AZ_ACCOUNT_NAME`, `AZ_CONTAINER`, `AZ_ACCOUNT_KEY`.
|
||||
|
||||
### Archivo de migración (YAML)
|
||||
|
||||
Define los jobs de migración. Ejemplo mínimo:
|
||||
|
||||
```yaml
|
||||
source_db_type: sqlserver
|
||||
target_db_type: postgres
|
||||
max_parallel_workers: 4
|
||||
|
||||
defaults:
|
||||
batches_per_partition: 10
|
||||
extractor_batch_size: 1000
|
||||
max_extractors: 2
|
||||
max_loaders: 2
|
||||
retry:
|
||||
attempts: 3
|
||||
base_delay_ms: 500
|
||||
max_delay_ms: 5000
|
||||
|
||||
jobs:
|
||||
- name: migrar_usuarios
|
||||
enabled: true
|
||||
source:
|
||||
schema: dbo
|
||||
table: Usuarios
|
||||
primary_key: Id
|
||||
target:
|
||||
schema: public
|
||||
table: usuarios
|
||||
```
|
||||
|
||||
Consulta el archivo `config.yaml` de tu entorno para ver los jobs disponibles y sus parámetros específicos.
|
||||
|
||||
## Modos de ejecución
|
||||
|
||||
- **Migración** (por defecto): extrae, transforma y carga datos en paralelo.
|
||||
- **Validación** (`-validate`): cuenta y compara filas entre origen y destino.
|
||||
- **Dry run** (`-dry-run`): verifica conexiones y muestra la cantidad de filas en origen.
|
||||
75
benchmark-results.md
Normal file
75
benchmark-results.md
Normal file
@@ -0,0 +1,75 @@
|
||||
# Benchmark go-migrate — 2,000,000 filas
|
||||
|
||||
**Tabla**: `Cartografia.MANZANA`
|
||||
**Fecha**: 2026-05-29
|
||||
**Entorno**: Docker local (MSSQL 2022 Developer / PostgreSQL 16 + PostGIS)
|
||||
|
||||
---
|
||||
|
||||
## Resultado final — 5 pasadas cada dirección
|
||||
|
||||
| Métrica | MSSQL → PostgreSQL | PostgreSQL → MSSQL |
|
||||
|---|---|---|
|
||||
| **Promedio** | **8.37s** | **16.77s** |
|
||||
| **Mediana** | 8.16s | 16.33s |
|
||||
| **Mínimo** | 7.75s | 16.03s |
|
||||
| **Máximo** | 9.17s | 18.46s |
|
||||
| **Desv. estándar** | 0.56s | 1.01s |
|
||||
| **Throughput promedio** | **~238,892 filas/seg** | **~119,261 filas/seg** |
|
||||
| **Factor** | 1x | **~2x más lento** |
|
||||
|
||||
---
|
||||
|
||||
## Evolución del tuning PG → MSSQL
|
||||
|
||||
| Etapa | Config | Tiempo | Throughput | Δ |
|
||||
|---|---|---|---|---|
|
||||
| Corrida 1 — original | conservadora | 236.8s | ~8,446 /seg | baseline |
|
||||
| Corrida 2 — igualada | mismos parámetros | 21.94s | ~91,148 /seg | +10.8x |
|
||||
| Tuning A | 4ext/8load 50k | 17.37s | ~115,200 /seg | +1.27x |
|
||||
| Tuning C | 16 loaders | 17.26s | ~115,900 /seg | +1.28x |
|
||||
| **Tuning D — óptimo** | **8ext/8load 50k** | **~16.77s** | **~119,261 /seg** | **+1.37x** |
|
||||
| Tablock + 8 loaders | lock exclusivo serial | ~44s | ~45,000 /seg | ❌ regresión |
|
||||
| Tablock + 1 loader | minimal logging | ~47s | ~42,000 /seg | ❌ regresión |
|
||||
|
||||
---
|
||||
|
||||
## Configuración óptima — `config-reverse.yaml`
|
||||
|
||||
```yaml
|
||||
max_parallel_workers: 4
|
||||
defaults:
|
||||
batches_per_partition: 4
|
||||
max_extractors: 8 # ← mayor lever de mejora
|
||||
extractor_batch_size: 25000
|
||||
extractor_queue_size: 32
|
||||
max_transformers: 8
|
||||
transformer_batch_size: 50000
|
||||
transformer_queue_size: 32
|
||||
max_loaders: 8
|
||||
loader_batch_size: 50000 # sweet spot — 75k y 100k peores
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Análisis de la brecha final (~2x)
|
||||
|
||||
La diferencia residual entre ambas direcciones es estructural y está en el protocolo de escritura:
|
||||
|
||||
| Protocolo | Mecanismo | Overhead |
|
||||
|---|---|---|
|
||||
| `pgx.CopyFrom` (→ PG) | PostgreSQL COPY protocol — streaming binario sin SQL | mínimo |
|
||||
| `mssql.CopyIn` (→ MSSQL) | BCP protocol — row-by-row dentro de un bulk statement | mayor por fila |
|
||||
|
||||
`mssql.CopyIn` itera fila a fila via `stmt.ExecContext(row...)` antes del flush final, lo que introduce overhead por fila independientemente del batch size. `pgx.CopyFrom` hace streaming puro.
|
||||
|
||||
---
|
||||
|
||||
## Hallazgos sobre Tablock
|
||||
|
||||
`Tablock: true` en `mssql.BulkOptions` resultó contraproducente en ambos escenarios:
|
||||
|
||||
- **Con 8 loaders paralelos**: cada loader compite por un lock exclusivo de tabla → serialización completa (~44s)
|
||||
- **Con 1 loader + batch enorme**: sin contención de locks, pero overhead de log + gestión de la lock exclusiva superó el beneficio de minimal logging (~47s)
|
||||
|
||||
**Conclusión**: para este patrón de carga (múltiples loaders concurrentes), `Tablock: false` (default) es siempre mejor.
|
||||
112
cmd/go_migrate/dryrun.go
Normal file
112
cmd/go_migrate/dryrun.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type DryRunResult struct {
|
||||
JobName string
|
||||
SourceTable string
|
||||
SourceCount int64
|
||||
Error error
|
||||
}
|
||||
|
||||
func runDryRun(
|
||||
ctx context.Context,
|
||||
azureClient *azure.Client,
|
||||
sourceDb dbwrapper.DbWrapper,
|
||||
jobs []config.Job,
|
||||
maxParallelWorkers int,
|
||||
) {
|
||||
log.Info("=== DRY RUN ===")
|
||||
log.Info("[DB] Source connection: OK")
|
||||
log.Info("[DB] Target connection: OK")
|
||||
|
||||
if azureClient != nil {
|
||||
if err := azureClient.Ping(ctx); err != nil {
|
||||
log.Errorf("[STORAGE] Azure: FAIL — %v", err)
|
||||
} else {
|
||||
log.Info("[STORAGE] Azure: OK")
|
||||
}
|
||||
} else {
|
||||
log.Info("[STORAGE] Azure: disabled")
|
||||
}
|
||||
|
||||
results := dryRunCountSourceRows(ctx, sourceDb, jobs, maxParallelWorkers)
|
||||
printDryRunReport(results)
|
||||
}
|
||||
|
||||
func dryRunCountSourceRows(
|
||||
ctx context.Context,
|
||||
sourceDb dbwrapper.DbWrapper,
|
||||
jobs []config.Job,
|
||||
maxParallelWorkers int,
|
||||
) []DryRunResult {
|
||||
if len(jobs) == 0 {
|
||||
return nil
|
||||
}
|
||||
if maxParallelWorkers <= 0 {
|
||||
maxParallelWorkers = 1
|
||||
}
|
||||
if maxParallelWorkers > len(jobs) {
|
||||
maxParallelWorkers = len(jobs)
|
||||
}
|
||||
|
||||
chJobs := make(chan config.Job, len(jobs))
|
||||
var mu sync.Mutex
|
||||
var results []DryRunResult
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for range maxParallelWorkers {
|
||||
wg.Go(func() {
|
||||
for job := range chJobs {
|
||||
res := DryRunResult{
|
||||
JobName: job.Name,
|
||||
SourceTable: fmt.Sprintf("[%s].[%s]", job.SourceTable.Schema, job.SourceTable.Table),
|
||||
}
|
||||
count, err := countSourceRows(ctx, sourceDb, job)
|
||||
if err != nil {
|
||||
res.Error = err
|
||||
} else {
|
||||
res.SourceCount = count
|
||||
}
|
||||
mu.Lock()
|
||||
results = append(results, res)
|
||||
mu.Unlock()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
chJobs <- job
|
||||
}
|
||||
close(chJobs)
|
||||
wg.Wait()
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func printDryRunReport(results []DryRunResult) {
|
||||
log.Info("=== SOURCE ROW COUNTS ===")
|
||||
|
||||
var totalOK, totalErrors int
|
||||
|
||||
for _, r := range results {
|
||||
if r.Error != nil {
|
||||
log.Errorf("[%s] %s — ERROR: %v", r.JobName, r.SourceTable, r.Error)
|
||||
totalErrors++
|
||||
} else {
|
||||
log.Infof("[%s] %s — rows: %d", r.JobName, r.SourceTable, r.SourceCount)
|
||||
totalOK++
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("=== Dry run complete: %d OK, %d errors ===", totalOK, totalErrors)
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
||||
@@ -17,11 +18,20 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func newTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
|
||||
if db.GetDialect() == "postgres" {
|
||||
return table_analyzers.NewPostgresTableAnalyzer(db)
|
||||
}
|
||||
return table_analyzers.NewMssqlTableAnalyzer(db)
|
||||
}
|
||||
|
||||
func main() {
|
||||
configureLog()
|
||||
checkExpiry()
|
||||
|
||||
configPath := flag.String("config", "", "path to migration config file")
|
||||
validate := flag.Bool("validate", false, "count rows in source and target per job and compare")
|
||||
dryRun := flag.Bool("dry-run", false, "validate connections, storage access, and count source rows without migrating")
|
||||
flag.Parse()
|
||||
|
||||
if flag.NArg() > 1 {
|
||||
@@ -41,17 +51,24 @@ func main() {
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
sourceDbUrl, err := config.App.ResolveSourceDbUrl(migrationConfig.SourceDbType)
|
||||
if err != nil {
|
||||
log.Fatalf("source DB config error: %v", err)
|
||||
}
|
||||
targetDbUrl, err := config.App.ResolveTargetDbUrl(migrationConfig.TargetDbType)
|
||||
if err != nil {
|
||||
log.Fatalf("target DB config error: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
log.Info("=== Starting migration ===")
|
||||
|
||||
var wgConnect errgroup.Group
|
||||
var sourceDb, targetDb dbwrapper.DbWrapper
|
||||
|
||||
wgConnect.Go(func() error {
|
||||
var err error
|
||||
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
|
||||
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, sourceDbUrl, 20*time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -62,7 +79,7 @@ func main() {
|
||||
|
||||
wgConnect.Go(func() error {
|
||||
var err error
|
||||
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
|
||||
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, targetDbUrl, 20*time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -78,7 +95,29 @@ func main() {
|
||||
defer sourceDb.Close()
|
||||
defer targetDb.Close()
|
||||
|
||||
results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
|
||||
var azureClient *azure.Client
|
||||
if config.App.AzureStorage.Enabled {
|
||||
var err error
|
||||
azureClient, err = azure.NewClient(config.App.AzureStorage)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create Azure storage client: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if *dryRun {
|
||||
runDryRun(ctx, azureClient, sourceDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
|
||||
return
|
||||
}
|
||||
|
||||
if *validate {
|
||||
validationResults := validateJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
|
||||
printValidationReport(validationResults)
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("=== Starting migration ===")
|
||||
|
||||
results := processMigrationJobs(ctx, sourceDb, targetDb, azureClient, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
|
||||
|
||||
log.Info("=== RESUMEN DE MIGRACIÓN ===")
|
||||
var totalProcessed, totalErrors int64
|
||||
@@ -109,6 +148,7 @@ func processMigrationJobs(
|
||||
ctx context.Context,
|
||||
sourceDb dbwrapper.DbWrapper,
|
||||
targetDb dbwrapper.DbWrapper,
|
||||
azureClient *azure.Client,
|
||||
jobs []config.Job,
|
||||
maxParallelWorkers int,
|
||||
) []models.JobResult {
|
||||
@@ -131,20 +171,11 @@ func processMigrationJobs(
|
||||
chJobs := make(chan config.Job, len(jobs))
|
||||
var wgJobs sync.WaitGroup
|
||||
|
||||
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
|
||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
||||
sourceTableAnalyzer := newTableAnalyzer(sourceDb)
|
||||
targetTableAnalyzer := newTableAnalyzer(targetDb)
|
||||
extractor := extractors.NewExtractor(sourceDb)
|
||||
loader := loaders.NewGenericLoader(targetDb)
|
||||
|
||||
var azureClient *azure.Client
|
||||
if config.App.AzureStorage.Enabled {
|
||||
var err error
|
||||
azureClient, err = azure.NewClient(config.App.AzureStorage)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create Azure storage client: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := range maxParallelWorkers {
|
||||
wgJobs.Go(func() {
|
||||
for job := range chJobs {
|
||||
@@ -158,6 +189,7 @@ func processMigrationJobs(
|
||||
azureClient,
|
||||
loader,
|
||||
job,
|
||||
sourceDb.GetDialect(),
|
||||
targetDb.GetDialect(),
|
||||
)
|
||||
|
||||
|
||||
@@ -46,9 +46,15 @@ func processMigrationJob(
|
||||
azureClient *azure.Client,
|
||||
loader loaders.GenericLoader,
|
||||
job config.Job,
|
||||
sourceDbType string,
|
||||
targetDbType string,
|
||||
) models.JobResult {
|
||||
transformer := transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
|
||||
var transformer etl.Transformer
|
||||
if sourceDbType == "postgres" {
|
||||
transformer = transformers.NewPostgresTransformer(job.SourceTable)
|
||||
} else {
|
||||
transformer = transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
|
||||
}
|
||||
localCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
|
||||
192
cmd/go_migrate/validate.go
Normal file
192
cmd/go_migrate/validate.go
Normal file
@@ -0,0 +1,192 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ValidationResult struct {
|
||||
JobName string
|
||||
SourceTable string
|
||||
TargetTable string
|
||||
SourceCount int64
|
||||
TargetCount int64
|
||||
Match bool
|
||||
Error error
|
||||
}
|
||||
|
||||
func countSourceRows(ctx context.Context, db dbwrapper.DbWrapper, job config.Job) (int64, error) {
|
||||
schema := job.SourceTable.Schema
|
||||
table := job.SourceTable.Table
|
||||
|
||||
hasRange := job.Range.Min != nil || job.Range.Max != nil
|
||||
|
||||
var (
|
||||
query string
|
||||
args []any
|
||||
)
|
||||
|
||||
if hasRange && job.SourceTable.PrimaryKey != "" {
|
||||
query = fmt.Sprintf("SELECT COUNT_BIG(*) FROM [%s].[%s] WHERE 1=1", schema, table)
|
||||
if job.Range.Min != nil {
|
||||
op := ">"
|
||||
if job.Range.IsMinInclusive {
|
||||
op = ">="
|
||||
}
|
||||
query += fmt.Sprintf(" AND [%s] %s @min", job.SourceTable.PrimaryKey, op)
|
||||
args = append(args, sql.Named("min", *job.Range.Min))
|
||||
}
|
||||
if job.Range.Max != nil {
|
||||
op := "<"
|
||||
if job.Range.IsMaxInclusive {
|
||||
op = "<="
|
||||
}
|
||||
query += fmt.Sprintf(" AND [%s] %s @max", job.SourceTable.PrimaryKey, op)
|
||||
args = append(args, sql.Named("max", *job.Range.Max))
|
||||
}
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT COUNT_BIG(*) FROM [%s].[%s]", schema, table)
|
||||
}
|
||||
|
||||
var count int64
|
||||
if err := db.QueryRow(ctx, query, args...).Scan(&count); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func countTargetRows(ctx context.Context, db dbwrapper.DbWrapper, job config.Job) (int64, error) {
|
||||
schema := job.TargetTable.Schema
|
||||
table := job.TargetTable.Table
|
||||
query := fmt.Sprintf(`SELECT COUNT(*) FROM "%s"."%s"`, schema, table)
|
||||
|
||||
var count int64
|
||||
if err := db.QueryRow(ctx, query).Scan(&count); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func validateJob(ctx context.Context, sourceDb, targetDb dbwrapper.DbWrapper, job config.Job) ValidationResult {
|
||||
result := ValidationResult{
|
||||
JobName: job.Name,
|
||||
SourceTable: fmt.Sprintf("[%s].[%s]", job.SourceTable.Schema, job.SourceTable.Table),
|
||||
TargetTable: fmt.Sprintf(`"%s"."%s"`, job.TargetTable.Schema, job.TargetTable.Table),
|
||||
}
|
||||
|
||||
var (
|
||||
sourceErr, targetErr error
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
result.SourceCount, sourceErr = countSourceRows(ctx, sourceDb, job)
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
result.TargetCount, targetErr = countTargetRows(ctx, targetDb, job)
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
if sourceErr != nil {
|
||||
result.Error = fmt.Errorf("source count failed: %w", sourceErr)
|
||||
return result
|
||||
}
|
||||
if targetErr != nil {
|
||||
result.Error = fmt.Errorf("target count failed: %w", targetErr)
|
||||
return result
|
||||
}
|
||||
|
||||
result.Match = result.SourceCount == result.TargetCount
|
||||
return result
|
||||
}
|
||||
|
||||
func validateJobs(
|
||||
ctx context.Context,
|
||||
sourceDb dbwrapper.DbWrapper,
|
||||
targetDb dbwrapper.DbWrapper,
|
||||
jobs []config.Job,
|
||||
maxParallelWorkers int,
|
||||
) []ValidationResult {
|
||||
if len(jobs) == 0 {
|
||||
return nil
|
||||
}
|
||||
if maxParallelWorkers <= 0 {
|
||||
maxParallelWorkers = 1
|
||||
}
|
||||
if maxParallelWorkers > len(jobs) {
|
||||
maxParallelWorkers = len(jobs)
|
||||
}
|
||||
|
||||
chJobs := make(chan config.Job, len(jobs))
|
||||
var mu sync.Mutex
|
||||
var results []ValidationResult
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for range maxParallelWorkers {
|
||||
wg.Go(func() {
|
||||
for job := range chJobs {
|
||||
res := validateJob(ctx, sourceDb, targetDb, job)
|
||||
mu.Lock()
|
||||
results = append(results, res)
|
||||
mu.Unlock()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
chJobs <- job
|
||||
}
|
||||
close(chJobs)
|
||||
wg.Wait()
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func printValidationReport(results []ValidationResult) {
|
||||
log.Info("=== VALIDATION REPORT ===")
|
||||
|
||||
var totalMatch, totalMismatch, totalErrors int
|
||||
|
||||
for _, r := range results {
|
||||
if r.Error != nil {
|
||||
log.Errorf("[%s] ERROR: %v", r.JobName, r.Error)
|
||||
totalErrors++
|
||||
continue
|
||||
}
|
||||
|
||||
if !r.Match {
|
||||
totalMismatch++
|
||||
diff := r.TargetCount - r.SourceCount
|
||||
var diffStr string
|
||||
if diff > 0 {
|
||||
diffStr = fmt.Sprintf(" (target has %d extra rows)", diff)
|
||||
} else {
|
||||
diffStr = fmt.Sprintf(" (target is missing %d rows)", -diff)
|
||||
}
|
||||
log.Warnf("[%s] MISMATCH | Source %s: %d | Target %s: %d%s",
|
||||
r.JobName,
|
||||
r.SourceTable, r.SourceCount,
|
||||
r.TargetTable, r.TargetCount,
|
||||
diffStr,
|
||||
)
|
||||
} else {
|
||||
totalMatch++
|
||||
log.Infof("[%s] OK | Source %s: %d | Target %s: %d",
|
||||
r.JobName,
|
||||
r.SourceTable, r.SourceCount,
|
||||
r.TargetTable, r.TargetCount,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("=== Validation complete: %d OK, %d mismatches, %d errors ===", totalMatch, totalMismatch, totalErrors)
|
||||
}
|
||||
35
config-reverse-original.yaml
Normal file
35
config-reverse-original.yaml
Normal file
@@ -0,0 +1,35 @@
|
||||
max_parallel_workers: 4
|
||||
source_db_type: postgres
|
||||
target_db_type: sqlserver
|
||||
|
||||
defaults:
|
||||
batches_per_partition: 4
|
||||
max_extractors: 2
|
||||
extractor_batch_size: 5000
|
||||
extractor_queue_size: 8
|
||||
max_transformers: 2
|
||||
transformer_batch_size: 12500
|
||||
transformer_queue_size: 8
|
||||
max_loaders: 4
|
||||
loader_batch_size: 25000
|
||||
partition_calculation_strategy: EXACT
|
||||
truncate_target: true
|
||||
truncate_method: TRUNCATE
|
||||
retry:
|
||||
attempts: 3
|
||||
base_delay_ms: 500
|
||||
max_delay_ms: 10000
|
||||
max_jitter_ms: 500
|
||||
max_failed_partitions: 5
|
||||
max_failed_batches_load: 5
|
||||
|
||||
jobs:
|
||||
- name: cartografia_manzana_reverse
|
||||
enabled: true
|
||||
source:
|
||||
schema: Cartografia
|
||||
table: MANZANA
|
||||
primary_key: GDB_ARCHIVE_OID
|
||||
target:
|
||||
schema: Cartografia
|
||||
table: MANZANA
|
||||
35
config-reverse.yaml
Normal file
35
config-reverse.yaml
Normal file
@@ -0,0 +1,35 @@
|
||||
max_parallel_workers: 4
|
||||
source_db_type: postgres
|
||||
target_db_type: sqlserver
|
||||
|
||||
defaults:
|
||||
batches_per_partition: 4
|
||||
max_extractors: 8
|
||||
extractor_batch_size: 25000
|
||||
extractor_queue_size: 32
|
||||
max_transformers: 8
|
||||
transformer_batch_size: 50000
|
||||
transformer_queue_size: 32
|
||||
max_loaders: 8
|
||||
loader_batch_size: 50000
|
||||
partition_calculation_strategy: EXACT
|
||||
truncate_target: true
|
||||
truncate_method: TRUNCATE
|
||||
retry:
|
||||
attempts: 3
|
||||
base_delay_ms: 500
|
||||
max_delay_ms: 10000
|
||||
max_jitter_ms: 500
|
||||
max_failed_partitions: 5
|
||||
max_failed_batches_load: 5
|
||||
|
||||
jobs:
|
||||
- name: cartografia_manzana_reverse
|
||||
enabled: true
|
||||
source:
|
||||
schema: Cartografia
|
||||
table: MANZANA
|
||||
primary_key: GDB_ARCHIVE_OID
|
||||
target:
|
||||
schema: Cartografia
|
||||
table: MANZANA
|
||||
91
config.yaml
91
config.yaml
@@ -33,56 +33,45 @@ jobs:
|
||||
target:
|
||||
schema: Cartografia
|
||||
table: MANZANA
|
||||
pre_sql:
|
||||
- 'SELECT 1'
|
||||
range:
|
||||
min: 1000000
|
||||
max: 2000000
|
||||
is_min_inclusive: false
|
||||
is_max_inclusive: true
|
||||
|
||||
- name: red_puerto
|
||||
enabled: true
|
||||
source:
|
||||
schema: Red
|
||||
table: PUERTO
|
||||
primary_key: ID_PUERTO
|
||||
from_json:
|
||||
- column: $node_id*
|
||||
field: id
|
||||
target:
|
||||
schema: Red
|
||||
table: PUERTO
|
||||
pre_sql:
|
||||
- 'SELECT 1'
|
||||
post_sql:
|
||||
- "SELECT 1"
|
||||
# - name: red_puerto
|
||||
# enabled: true
|
||||
# source:
|
||||
# schema: Red
|
||||
# table: PUERTO
|
||||
# primary_key: ID_PUERTO
|
||||
# from_json:
|
||||
# - column: $node_id*
|
||||
# field: id
|
||||
# target:
|
||||
# schema: Red
|
||||
# table: PUERTO
|
||||
|
||||
- name: infraestructura_site_holder__attach
|
||||
source:
|
||||
schema: Infraestructura
|
||||
table: SITE_HOLDER__ATTACH
|
||||
primary_key: GDB_ARCHIVE_OID
|
||||
target:
|
||||
schema: Infraestructura
|
||||
table: SITE_HOLDER__ATTACH
|
||||
to_storage:
|
||||
columns:
|
||||
- source: DATA
|
||||
target: FILE_URL
|
||||
mode: REFERENCE_ONLY
|
||||
prefix: Infraestructura/SITE_HOLDER__ATTACH
|
||||
batches_per_partition: 20
|
||||
max_extractors: 32
|
||||
extractor_batch_size: 1
|
||||
extractor_queue_size: 100
|
||||
max_transformers: 48
|
||||
transformer_batch_size: 500
|
||||
transformer_queue_size: 8
|
||||
max_loaders: 4
|
||||
loader_batch_size: 500
|
||||
retry:
|
||||
attempts: 5
|
||||
base_delay_ms: 1000
|
||||
max_delay_ms: 15000
|
||||
max_jitter_ms: 500
|
||||
# - name: infraestructura_site_holder__attach
|
||||
# source:
|
||||
# schema: Infraestructura
|
||||
# table: SITE_HOLDER__ATTACH
|
||||
# primary_key: GDB_ARCHIVE_OID
|
||||
# target:
|
||||
# schema: Infraestructura
|
||||
# table: SITE_HOLDER__ATTACH
|
||||
# to_storage:
|
||||
# columns:
|
||||
# - source: DATA
|
||||
# target: FILE_URL
|
||||
# mode: REFERENCE_ONLY
|
||||
# prefix: Infraestructura/SITE_HOLDER__ATTACH
|
||||
# batches_per_partition: 20
|
||||
# max_extractors: 32
|
||||
# extractor_batch_size: 1
|
||||
# extractor_queue_size: 100
|
||||
# max_transformers: 48
|
||||
# transformer_batch_size: 500
|
||||
# transformer_queue_size: 8
|
||||
# max_loaders: 4
|
||||
# loader_batch_size: 500
|
||||
# retry:
|
||||
# attempts: 5
|
||||
# base_delay_ms: 1000
|
||||
# max_delay_ms: 15000
|
||||
# max_jitter_ms: 500
|
||||
|
||||
2
docker/.gitignore
vendored
Normal file
2
docker/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
data/**/*
|
||||
compose.override.yml
|
||||
50
docker/compose.yml
Normal file
50
docker/compose.yml
Normal file
@@ -0,0 +1,50 @@
|
||||
name: db-migration
|
||||
services:
|
||||
azurite:
|
||||
image: mcr.microsoft.com/azure-storage/azurite:3.35.0
|
||||
container_name: azurite
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 8880:10000
|
||||
- 8881:10001
|
||||
- 8882:10002
|
||||
volumes:
|
||||
- ./data/azurite:/data
|
||||
command: 'azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0 --location /data --skipApiVersionCheck'
|
||||
profiles:
|
||||
- storage
|
||||
- target
|
||||
|
||||
mssql:
|
||||
image: mcr.microsoft.com/mssql/server:2022-latest
|
||||
restart: unless-stopped
|
||||
environment:
|
||||
ACCEPT_EULA: Y
|
||||
MSSQL_SA_PASSWORD: SecurePassword123
|
||||
MSSQL_PID: Developer
|
||||
MSSQL_MEMORY_LIMIT_MB: 8192
|
||||
ports:
|
||||
- 8883:1433
|
||||
volumes:
|
||||
- ./data/mssql:/var/opt/mssql
|
||||
profiles:
|
||||
- mssql
|
||||
- source
|
||||
- db
|
||||
|
||||
postgres:
|
||||
image: postgis/postgis:16-3.4
|
||||
restart: unless-stopped
|
||||
environment:
|
||||
POSTGRES_DB: test_db
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_PASSWORD: SecurePassword123
|
||||
ports:
|
||||
- 8884:5432
|
||||
volumes:
|
||||
- ./data/postgres:/var/lib/postgresql/data
|
||||
profiles:
|
||||
- postgres
|
||||
- target
|
||||
- db
|
||||
shm_size: '1gb'
|
||||
@@ -70,6 +70,15 @@ func (c *Client) UploadBuffer(ctx context.Context, containerName, blobPath strin
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) Ping(ctx context.Context) error {
|
||||
pager := c.client.NewListBlobsFlatPager(c.azureStorageConfig.Container, nil)
|
||||
_, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("storage access check failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []byte) (string, error) {
|
||||
if blobPath == "" || buffer == nil {
|
||||
return "", ErrInvalidInput
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"maps"
|
||||
"net/url"
|
||||
|
||||
"github.com/ilyakaznacheev/cleanenv"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -16,10 +20,95 @@ type AzureStorageConfig struct {
|
||||
}
|
||||
|
||||
type appConfig struct {
|
||||
SourceDbUrl string `env:"SOURCE_DB_URL" env-required:"true"`
|
||||
TargetDbUrl string `env:"TARGET_DB_URL" env-required:"true"`
|
||||
LogLevel string `env:"LOG_LEVEL" env-default:"INFO"`
|
||||
AzureStorage AzureStorageConfig
|
||||
SourceDbUrl string `env:"SOURCE_DB_URL"`
|
||||
SourceDbHost string `env:"SOURCE_DB_HOST"`
|
||||
SourceDbPort string `env:"SOURCE_DB_PORT"`
|
||||
SourceDbName string `env:"SOURCE_DB_NAME"`
|
||||
SourceDbUser string `env:"SOURCE_DB_USER"`
|
||||
SourceDbPwd string `env:"SOURCE_DB_PWD"`
|
||||
SourceDbOptions string `env:"SOURCE_DB_OPTIONS"`
|
||||
TargetDbUrl string `env:"TARGET_DB_URL"`
|
||||
TargetDbHost string `env:"TARGET_DB_HOST"`
|
||||
TargetDbPort string `env:"TARGET_DB_PORT"`
|
||||
TargetDbName string `env:"TARGET_DB_NAME"`
|
||||
TargetDbUser string `env:"TARGET_DB_USER"`
|
||||
TargetDbPwd string `env:"TARGET_DB_PWD"`
|
||||
TargetDbOptions string `env:"TARGET_DB_OPTIONS"`
|
||||
LogLevel string `env:"LOG_LEVEL" env-default:"INFO"`
|
||||
AzureStorage AzureStorageConfig
|
||||
}
|
||||
|
||||
func (c *appConfig) ResolveSourceDbUrl(dbType string) (string, error) {
|
||||
if c.SourceDbUrl != "" {
|
||||
return c.SourceDbUrl, nil
|
||||
}
|
||||
u, err := buildDbUrl(dbType, c.SourceDbHost, c.SourceDbPort, c.SourceDbName, c.SourceDbUser, c.SourceDbPwd, c.SourceDbOptions)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("source DB: %w", err)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func (c *appConfig) ResolveTargetDbUrl(dbType string) (string, error) {
|
||||
if c.TargetDbUrl != "" {
|
||||
return c.TargetDbUrl, nil
|
||||
}
|
||||
u, err := buildDbUrl(dbType, c.TargetDbHost, c.TargetDbPort, c.TargetDbName, c.TargetDbUser, c.TargetDbPwd, c.TargetDbOptions)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("target DB: %w", err)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func buildDbUrl(dbType, host, port, name, user, pwd, options string) (string, error) {
|
||||
if host == "" {
|
||||
return "", fmt.Errorf("DB_HOST is required when DB_URL is not set")
|
||||
}
|
||||
if name == "" {
|
||||
return "", fmt.Errorf("DB_NAME is required when DB_URL is not set")
|
||||
}
|
||||
if user == "" {
|
||||
return "", fmt.Errorf("DB_USER is required when DB_URL is not set")
|
||||
}
|
||||
|
||||
switch dbType {
|
||||
case "sqlserver":
|
||||
if port == "" {
|
||||
port = "1433"
|
||||
}
|
||||
q := url.Values{}
|
||||
if options != "" {
|
||||
extra, err := url.ParseQuery(options)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("invalid DB_OPTIONS: %w", err)
|
||||
}
|
||||
maps.Copy(q, extra)
|
||||
}
|
||||
q.Set("database", name)
|
||||
u := &url.URL{
|
||||
Scheme: "sqlserver",
|
||||
Host: host + ":" + port,
|
||||
User: url.UserPassword(user, pwd),
|
||||
RawQuery: q.Encode(),
|
||||
}
|
||||
return u.String(), nil
|
||||
|
||||
case "postgres":
|
||||
if port == "" {
|
||||
port = "5432"
|
||||
}
|
||||
u := &url.URL{
|
||||
Scheme: "postgres",
|
||||
Host: host + ":" + port,
|
||||
User: url.UserPassword(user, pwd),
|
||||
Path: "/" + name,
|
||||
RawQuery: options,
|
||||
}
|
||||
return u.String(), nil
|
||||
|
||||
default:
|
||||
return "", fmt.Errorf("unknown db type %q — cannot build URL from individual components", dbType)
|
||||
}
|
||||
}
|
||||
|
||||
func getAppConfig() appConfig {
|
||||
|
||||
@@ -2,6 +2,7 @@ package table_analyzers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -9,6 +10,7 @@ import (
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type PostgresTableAnalyzer struct {
|
||||
@@ -161,7 +163,30 @@ func (ta *PostgresTableAnalyzer) EstimateTotalRows(
|
||||
ctx context.Context,
|
||||
tableInfo config.TableInfo,
|
||||
) (int64, error) {
|
||||
return 0, nil
|
||||
query := `
|
||||
SELECT reltuples::bigint
|
||||
FROM pg_class
|
||||
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
|
||||
WHERE pg_namespace.nspname = $1 AND pg_class.relname = $2`
|
||||
|
||||
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
var estimate int64
|
||||
err := ta.db.QueryRow(ctxTimeout, query, tableInfo.Schema, tableInfo.Table).Scan(&estimate)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if estimate < 0 {
|
||||
countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM "%s"."%s"`, tableInfo.Schema, tableInfo.Table)
|
||||
err = ta.db.QueryRow(ctxTimeout, countQuery).Scan(&estimate)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return estimate, nil
|
||||
}
|
||||
|
||||
func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn(
|
||||
@@ -169,7 +194,19 @@ func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn(
|
||||
tableInfo config.TableInfo,
|
||||
columnName string,
|
||||
) (etl.MaxMinColumnResult, error) {
|
||||
return etl.MaxMinColumnResult{}, nil
|
||||
query := fmt.Sprintf(`SELECT MIN("%s"), MAX("%s") FROM "%s"."%s"`,
|
||||
columnName, columnName, tableInfo.Schema, tableInfo.Table)
|
||||
|
||||
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
result := etl.MaxMinColumnResult{}
|
||||
err := ta.db.QueryRow(ctxTimeout, query).Scan(&result.Min, &result.Max)
|
||||
if err != nil {
|
||||
return etl.MaxMinColumnResult{}, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
|
||||
@@ -179,5 +216,78 @@ func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
|
||||
maxPartitions int64,
|
||||
rangeConstraint config.RangeConfig,
|
||||
) ([]models.Partition, error) {
|
||||
return []models.Partition{}, nil
|
||||
whereClause := ""
|
||||
args := []any{maxPartitions}
|
||||
|
||||
if rangeConstraint.Min != nil || rangeConstraint.Max != nil {
|
||||
var conditions []string
|
||||
if rangeConstraint.Min != nil {
|
||||
minOp := ">"
|
||||
if rangeConstraint.IsMinInclusive {
|
||||
minOp = ">="
|
||||
}
|
||||
args = append(args, *rangeConstraint.Min)
|
||||
conditions = append(conditions, fmt.Sprintf(`"%s" %s $%d`, partitionColumn, minOp, len(args)))
|
||||
}
|
||||
if rangeConstraint.Max != nil {
|
||||
maxOp := "<"
|
||||
if rangeConstraint.IsMaxInclusive {
|
||||
maxOp = "<="
|
||||
}
|
||||
args = append(args, *rangeConstraint.Max)
|
||||
conditions = append(conditions, fmt.Sprintf(`"%s" %s $%d`, partitionColumn, maxOp, len(args)))
|
||||
}
|
||||
whereClause = "WHERE " + strings.Join(conditions, " AND ")
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT MIN("%s") AS lower_limit, MAX("%s") AS upper_limit
|
||||
FROM (
|
||||
SELECT "%s", NTILE($1) OVER (ORDER BY "%s") AS batch_id
|
||||
FROM "%s"."%s" %s
|
||||
) AS t
|
||||
GROUP BY batch_id
|
||||
ORDER BY batch_id`,
|
||||
partitionColumn,
|
||||
partitionColumn,
|
||||
partitionColumn,
|
||||
partitionColumn,
|
||||
tableInfo.Schema,
|
||||
tableInfo.Table,
|
||||
whereClause)
|
||||
|
||||
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
rows, err := ta.db.Query(ctxTimeout, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
partitions := make([]models.Partition, 0, maxPartitions)
|
||||
|
||||
for rows.Next() {
|
||||
partition := models.Partition{
|
||||
Id: uuid.New(),
|
||||
HasRange: true,
|
||||
RetryCounter: 0,
|
||||
Range: models.PartitionRange{
|
||||
IsMinInclusive: true,
|
||||
IsMaxInclusive: true,
|
||||
},
|
||||
}
|
||||
|
||||
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
partitions = append(partitions, partition)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return partitions, nil
|
||||
}
|
||||
|
||||
@@ -59,6 +59,65 @@ func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransfor
|
||||
return plan
|
||||
}
|
||||
|
||||
func computePostgresTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
||||
var plan []etl.ColumnTransformPlan
|
||||
|
||||
for i, col := range columns {
|
||||
switch col.SystemType() {
|
||||
case "int2", "int4", "int8", "integer", "smallint", "bigint":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if v64, ok := ToInt64(v); ok {
|
||||
return v64, nil
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "uuid":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
switch b := v.(type) {
|
||||
case []byte:
|
||||
if b != nil {
|
||||
return bigEndianToMssqlUuid(b)
|
||||
}
|
||||
case [16]byte:
|
||||
return bigEndianToMssqlUuid(b[:])
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "geometry":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return ewkbToMssqlGeo(b, false)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "geography":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return ewkbToMssqlGeo(b, true)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return plan
|
||||
}
|
||||
|
||||
func computeStorageTransformationPlan(
|
||||
ctx context.Context,
|
||||
azureClient *azure.Client,
|
||||
|
||||
72
internal/app/etl/transformers/postgres.go
Normal file
72
internal/app/etl/transformers/postgres.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package transformers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
type PostgresTransformer struct {
|
||||
sourceTable config.SourceTableInfo
|
||||
}
|
||||
|
||||
func NewPostgresTransformer(sourceTable config.SourceTableInfo) etl.Transformer {
|
||||
return &PostgresTransformer{sourceTable: sourceTable}
|
||||
}
|
||||
|
||||
func (pgTr *PostgresTransformer) Consume(
|
||||
ctx context.Context,
|
||||
columns []models.ColumnType,
|
||||
retryConfig config.RetryConfig,
|
||||
batchSize int,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
) {
|
||||
transformationPlan := computePostgresTransformationPlan(columns)
|
||||
|
||||
acc := &batchAccumulator{batchSize: batchSize}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
acc.flush(ctx, chBatchesOut, wgActiveBatches)
|
||||
return
|
||||
}
|
||||
|
||||
if len(transformationPlan) > 0 {
|
||||
if err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig); err != nil {
|
||||
sendTransformError(ctx, err, chJobErrorsOut)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if batchSize <= 0 {
|
||||
wgActiveBatches.Add(1)
|
||||
select {
|
||||
case chBatchesOut <- batch:
|
||||
case <-ctx.Done():
|
||||
wgActiveBatches.Done()
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
acc.add(batch)
|
||||
if acc.ready() {
|
||||
if !acc.flush(ctx, chBatchesOut, wgActiveBatches) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
mssqlclrgeo "github.com/gaspardle/go-mssqlclrgeo"
|
||||
)
|
||||
|
||||
func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
|
||||
@@ -62,6 +64,29 @@ func ensureUTC(t time.Time) time.Time {
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
|
||||
}
|
||||
|
||||
func bigEndianToMssqlUuid(pgUuid []byte) ([]byte, error) {
|
||||
if len(pgUuid) != 16 {
|
||||
return nil, errors.New("Invalid uuid")
|
||||
}
|
||||
|
||||
mssqlUuid := make([]byte, 16)
|
||||
mssqlUuid[0], mssqlUuid[1], mssqlUuid[2], mssqlUuid[3] = pgUuid[3], pgUuid[2], pgUuid[1], pgUuid[0]
|
||||
mssqlUuid[4], mssqlUuid[5] = pgUuid[5], pgUuid[4]
|
||||
mssqlUuid[6], mssqlUuid[7] = pgUuid[7], pgUuid[6]
|
||||
copy(mssqlUuid[8:], pgUuid[8:])
|
||||
|
||||
return mssqlUuid, nil
|
||||
}
|
||||
|
||||
func ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error) {
|
||||
if len(ewkb) < 5 {
|
||||
return nil, errors.New("Invalid ewkb")
|
||||
}
|
||||
// mssqlclrgeo reads the SRID flag and bytes directly from EWKB,
|
||||
// so no pre-processing needed — pass through as-is.
|
||||
return mssqlclrgeo.WkbToUdtGeo(ewkb, isGeography)
|
||||
}
|
||||
|
||||
func ToInt64(v any) (int64, bool) {
|
||||
switch t := v.(type) {
|
||||
case int:
|
||||
|
||||
137
openspec/changes/bidirectional-transforms/plan.md
Normal file
137
openspec/changes/bidirectional-transforms/plan.md
Normal file
@@ -0,0 +1,137 @@
|
||||
# Plan: Bidirectional Transformation Support
|
||||
|
||||
## Goal
|
||||
|
||||
Make the transformation pipeline direction-aware. Currently hardcoded to MSSQL → PG; add support for PG → MSSQL by applying inverse transformations when `SourceDbType == "postgres"`.
|
||||
|
||||
Excluded: `to_storage` Azure blob upload (not reversible).
|
||||
|
||||
---
|
||||
|
||||
## Hardcoded wiring to fix
|
||||
|
||||
| File | Line | Change |
|
||||
|---|---|---|
|
||||
| `cmd/go_migrate/process.go` | 51 | Branch on `SourceDbType`: `"sqlserver"` → `NewMssqlTransformer`, `"postgres"` → `NewPostgresTransformer` |
|
||||
| `cmd/go_migrate/main.go` | 166–167 | Branch on source/target type for both `TableAnalyzer` selections |
|
||||
|
||||
---
|
||||
|
||||
## Transformations
|
||||
|
||||
### Forward (MSSQL → PG) — unchanged
|
||||
|
||||
| Column type | Function | File |
|
||||
|---|---|---|
|
||||
| `uniqueidentifier` | `mssqlUuidToBigEndian` | `utils.go:9` |
|
||||
| `geometry`/`geography` | `wkbToEwkbWithSrid` | `utils.go:25` |
|
||||
| `datetime`/`datetime2` | `ensureUTC` | `utils.go:57` |
|
||||
|
||||
### Inverse (PG → MSSQL) — new
|
||||
|
||||
| PG system type | Action |
|
||||
|---|---|
|
||||
| `uuid` | `bigEndianToMssqlUuid`: re-swap bytes [0-3], [4-5], [6-7] |
|
||||
| `geometry` | `ewkbToMssqlGeo(v, false)`: strip SRID → WKB → `WkbToUdtGeo` |
|
||||
| `geography` | `ewkbToMssqlGeo(v, true)`: strip SRID → WKB → `WkbToUdtGeo` |
|
||||
| `timestamp`/`timestamptz` | no-op |
|
||||
|
||||
**Geometry note**: MSSQL rejects plain WKB via bulk protocol. Must use `mssqlclrgeo.WkbToUdtGeo(wkb, isGeography)` (already in go.mod). PG extractor already emits EWKB via `ST_AsEWKB()`.
|
||||
|
||||
---
|
||||
|
||||
## New utility functions (`transformers/utils.go`)
|
||||
|
||||
### `bigEndianToMssqlUuid(v []byte) []byte`
|
||||
```
|
||||
out[0..3] = v[3,2,1,0]
|
||||
out[4..5] = v[5,4]
|
||||
out[6..7] = v[7,6]
|
||||
out[8..15] = v[8..15]
|
||||
```
|
||||
|
||||
### `ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error)`
|
||||
1. Read byte-order flag from `ewkb[0]`
|
||||
2. Read geometry type word bytes [1..4]
|
||||
3. If SRID flag (`0x20000000`) is set: strip bytes [5..8], clear flag in type word
|
||||
4. Call `mssqlclrgeo.WkbToUdtGeo(wkb, isGeography)`
|
||||
|
||||
---
|
||||
|
||||
## New files
|
||||
|
||||
### `transformers/postgres.go`
|
||||
```go
|
||||
func NewPostgresTransformer(...) *Transformer {
|
||||
// same signature as NewMssqlTransformer
|
||||
// calls computePostgresTransformationPlan instead
|
||||
// does NOT call computeStorageTransformationPlan
|
||||
}
|
||||
```
|
||||
|
||||
### `computePostgresTransformationPlan` in `transformers/plan.go`
|
||||
Iterates `sourceColTypes` (from PG analyzer), applies inverse closures by system type.
|
||||
|
||||
---
|
||||
|
||||
## PostgreSQL table analyzer stubs to implement (`table_analyzers/postgres.go`)
|
||||
|
||||
Required for PG-as-source partitioned extraction:
|
||||
|
||||
### `EstimateTotalRows`
|
||||
```sql
|
||||
SELECT reltuples::bigint FROM pg_class
|
||||
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
|
||||
WHERE pg_namespace.nspname = $schema AND pg_class.relname = $table
|
||||
```
|
||||
Fallback to `COUNT(*)` if `reltuples < 0`.
|
||||
|
||||
### `QueryMaxMinFromColumn`
|
||||
```sql
|
||||
SELECT MIN("col"), MAX("col") FROM "schema"."table"
|
||||
```
|
||||
|
||||
### `CalculatePartitionRanges`
|
||||
Use min/max from above + `rowsPerPartition` to compute boundaries. Mirror the logic from `MssqlTableAnalyzer.CalculatePartitionRanges`.
|
||||
|
||||
---
|
||||
|
||||
## Test cases
|
||||
|
||||
### TC-1: `bigEndianToMssqlUuid` — round-trip
|
||||
- Input: run `mssqlUuidToBigEndian` on a known 16-byte MSSQL UUID → produces PG UUID
|
||||
- Assert: `bigEndianToMssqlUuid(pgUUID)` == original MSSQL UUID bytes
|
||||
- Also assert nil input → nil output (no panic)
|
||||
|
||||
### TC-2: `bigEndianToMssqlUuid` — known vector
|
||||
- Input: `[0x6b,0xa7,0xb8,0x10, 0x9d,0xad, 0x11,0xd1, 0x80,0xb4,0x00,0xc0,0x4f,0xd4,0x30,0xc8]` (RFC 4122 nil UUID variant)
|
||||
- Assert: bytes [0-3] are reversed, [4-5] reversed, [6-7] reversed, [8-15] identical
|
||||
|
||||
### TC-3: `ewkbToMssqlGeo` — geometry round-trip
|
||||
- Input: generate a polygon via `go-geom` + `wkb.Marshal` → plain WKB
|
||||
- Forward: run `wkbToEwkbWithSrid` → EWKB
|
||||
- Inverse: run `ewkbToMssqlGeo(ewkb, false)` → CLR/UDT bytes
|
||||
- Assert: no error, output is non-empty `[]byte`
|
||||
|
||||
### TC-4: `ewkbToMssqlGeo` — nil input
|
||||
- Input: nil
|
||||
- Assert: returns nil, nil (no panic)
|
||||
|
||||
### TC-5: `ewkbToMssqlGeo` — EWKB without SRID flag
|
||||
- Input: plain WKB (no SRID flag set)
|
||||
- Assert: function still calls `WkbToUdtGeo` and returns without error
|
||||
|
||||
### TC-6: Transformer factory selection
|
||||
- Given `SourceDbType == "postgres"` → `NewPostgresTransformer` is selected
|
||||
- Given `SourceDbType == "sqlserver"` → `NewMssqlTransformer` is selected
|
||||
|
||||
---
|
||||
|
||||
## Files changed (summary)
|
||||
|
||||
1. `cmd/go_migrate/process.go` — transformer factory branch
|
||||
2. `cmd/go_migrate/main.go` — analyzer selection branch
|
||||
3. `internal/app/etl/transformers/utils.go` — 2 new functions
|
||||
4. `internal/app/etl/transformers/plan.go` — `computePostgresTransformationPlan`
|
||||
5. `internal/app/etl/transformers/postgres.go` *(new)*
|
||||
6. `internal/app/etl/table_analyzers/postgres.go` — 3 stub implementations
|
||||
@@ -12,10 +12,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// totalRows int = 1_000_000
|
||||
totalRows int = 1000
|
||||
chunkSize int = 200
|
||||
queueSize int = 4
|
||||
totalRows int = 2_000_000
|
||||
chunkSize int = 5000
|
||||
queueSize int = 8
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -41,13 +40,13 @@ func main() {
|
||||
seedManzanas(ctx, db)
|
||||
})
|
||||
|
||||
wgSeed.Go(func() {
|
||||
seedPuertos(ctx, db)
|
||||
})
|
||||
// wgSeed.Go(func() {
|
||||
// seedPuertos(ctx, db)
|
||||
// })
|
||||
|
||||
wgSeed.Go(func() {
|
||||
seedSiteHolderAttach(ctx, db)
|
||||
})
|
||||
// wgSeed.Go(func() {
|
||||
// seedSiteHolderAttach(ctx, db)
|
||||
// })
|
||||
|
||||
wgSeed.Wait()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user