Compare commits
6 Commits
b386965bb8
...
refactor/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
86258718d8
|
|||
|
13cd02a824
|
|||
|
f844004942
|
|||
|
4ba26092a9
|
|||
|
537b7fbd28
|
|||
|
d534314cff
|
@@ -1,24 +0,0 @@
|
|||||||
# Skill Registry — go-migrate
|
|
||||||
|
|
||||||
Generated: 2026-04-21
|
|
||||||
|
|
||||||
## Compact Rules
|
|
||||||
|
|
||||||
### Go conventions
|
|
||||||
- Use existing error wrapping pattern: `fmt.Errorf("context: %w", err)`
|
|
||||||
- Channel-based pipeline — keep goroutine lifecycle clean (close channels in correct order)
|
|
||||||
- No comments unless non-obvious WHY; no docstrings
|
|
||||||
- Prefer named returns only when it aids clarity in short functions
|
|
||||||
- Use `strings.EqualFold` for case-insensitive column name comparison
|
|
||||||
|
|
||||||
### Project conventions
|
|
||||||
- Config structs live in `internal/app/config/`
|
|
||||||
- ETL interfaces live in `internal/app/etl/types.go`
|
|
||||||
- Transformer implementations in `internal/app/etl/transformers/`
|
|
||||||
- Azure operations via `internal/app/azure/main.go`
|
|
||||||
- Per-job transformer creation (not shared) when job has storage config
|
|
||||||
|
|
||||||
## User Skills
|
|
||||||
| Trigger | Skill |
|
|
||||||
|---------|-------|
|
|
||||||
| sdd-* | SDD workflow skills |
|
|
||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -29,3 +29,4 @@ go.work.sum
|
|||||||
# .idea/
|
# .idea/
|
||||||
.vscode/
|
.vscode/
|
||||||
.temp
|
.temp
|
||||||
|
.atl
|
||||||
|
|||||||
92
README.md
Normal file
92
README.md
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
# go-migrate
|
||||||
|
|
||||||
|
Migrador de datos entre SQL Server y PostgreSQL con procesamiento en paralelo.
|
||||||
|
|
||||||
|
## Compilar
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go build -o go-migrate ./cmd/go_migrate
|
||||||
|
```
|
||||||
|
|
||||||
|
## Uso
|
||||||
|
|
||||||
|
```bash
|
||||||
|
./go-migrate [opciones] [<ruta-config>]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Opciones
|
||||||
|
|
||||||
|
| Flag | Descripción |
|
||||||
|
|------|-------------|
|
||||||
|
| `-config <path>` | Ruta al archivo de configuración YAML. También se puede pasar como argumento posicional. Si no se indica, se busca `config.yaml`. |
|
||||||
|
| `-validate` | Compara la cantidad de filas entre origen y destino por cada job. No migra datos. |
|
||||||
|
| `-dry-run` | Valida conexiones, acceso a storage (si aplica) y cuenta filas en origen sin migrar. |
|
||||||
|
|
||||||
|
### Ejemplos
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Migrar con config.yaml por defecto
|
||||||
|
./go-migrate
|
||||||
|
|
||||||
|
# Usar un archivo de configuración específico
|
||||||
|
./go-migrate -config produccion.yaml
|
||||||
|
|
||||||
|
# Validar que origen y destino tengan la misma cantidad de filas
|
||||||
|
./go-migrate -validate -config produccion.yaml
|
||||||
|
|
||||||
|
# Verificar conectividad sin migrar
|
||||||
|
./go-migrate -dry-run -config produccion.yaml
|
||||||
|
```
|
||||||
|
|
||||||
|
## Configuración
|
||||||
|
|
||||||
|
La herramienta lee credenciales y parámetros desde variables de entorno o un archivo `.env`.
|
||||||
|
|
||||||
|
### Variables clave
|
||||||
|
|
||||||
|
| Variable | Descripción |
|
||||||
|
|----------|-------------|
|
||||||
|
| `SOURCE_DB_URL` | URL de conexión a la base de datos origen (o `SOURCE_DB_HOST`, `SOURCE_DB_NAME`, `SOURCE_DB_USER`, `SOURCE_DB_PWD`). |
|
||||||
|
| `TARGET_DB_URL` | URL de conexión a la base de datos destino (o `TARGET_DB_HOST`, `TARGET_DB_NAME`, `TARGET_DB_USER`, `TARGET_DB_PWD`). |
|
||||||
|
| `LOG_LEVEL` | Nivel de log: `DEBUG`, `INFO`, `WARN`, `ERROR` (por defecto: `INFO`). |
|
||||||
|
|
||||||
|
Para migrar datos binarios a Azure Blob, también se requieren `AZ_STORAGE_ENABLED`, `AZ_ACCOUNT_NAME`, `AZ_CONTAINER`, `AZ_ACCOUNT_KEY`.
|
||||||
|
|
||||||
|
### Archivo de migración (YAML)
|
||||||
|
|
||||||
|
Define los jobs de migración. Ejemplo mínimo:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
source_db_type: sqlserver
|
||||||
|
target_db_type: postgres
|
||||||
|
max_parallel_workers: 4
|
||||||
|
|
||||||
|
defaults:
|
||||||
|
batches_per_partition: 10
|
||||||
|
extractor_batch_size: 1000
|
||||||
|
max_extractors: 2
|
||||||
|
max_loaders: 2
|
||||||
|
retry:
|
||||||
|
attempts: 3
|
||||||
|
base_delay_ms: 500
|
||||||
|
max_delay_ms: 5000
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
- name: migrar_usuarios
|
||||||
|
enabled: true
|
||||||
|
source:
|
||||||
|
schema: dbo
|
||||||
|
table: Usuarios
|
||||||
|
primary_key: Id
|
||||||
|
target:
|
||||||
|
schema: public
|
||||||
|
table: usuarios
|
||||||
|
```
|
||||||
|
|
||||||
|
Consulta el archivo `config.yaml` de tu entorno para ver los jobs disponibles y sus parámetros específicos.
|
||||||
|
|
||||||
|
## Modos de ejecución
|
||||||
|
|
||||||
|
- **Migración** (por defecto): extrae, transforma y carga datos en paralelo.
|
||||||
|
- **Validación** (`-validate`): cuenta y compara filas entre origen y destino.
|
||||||
|
- **Dry run** (`-dry-run`): verifica conexiones y muestra la cantidad de filas en origen.
|
||||||
75
benchmark-results.md
Normal file
75
benchmark-results.md
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
# Benchmark go-migrate — 2,000,000 filas
|
||||||
|
|
||||||
|
**Tabla**: `Cartografia.MANZANA`
|
||||||
|
**Fecha**: 2026-05-29
|
||||||
|
**Entorno**: Docker local (MSSQL 2022 Developer / PostgreSQL 16 + PostGIS)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Resultado final — 5 pasadas cada dirección
|
||||||
|
|
||||||
|
| Métrica | MSSQL → PostgreSQL | PostgreSQL → MSSQL |
|
||||||
|
|---|---|---|
|
||||||
|
| **Promedio** | **8.37s** | **16.77s** |
|
||||||
|
| **Mediana** | 8.16s | 16.33s |
|
||||||
|
| **Mínimo** | 7.75s | 16.03s |
|
||||||
|
| **Máximo** | 9.17s | 18.46s |
|
||||||
|
| **Desv. estándar** | 0.56s | 1.01s |
|
||||||
|
| **Throughput promedio** | **~238,892 filas/seg** | **~119,261 filas/seg** |
|
||||||
|
| **Factor** | 1x | **~2x más lento** |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Evolución del tuning PG → MSSQL
|
||||||
|
|
||||||
|
| Etapa | Config | Tiempo | Throughput | Δ |
|
||||||
|
|---|---|---|---|---|
|
||||||
|
| Corrida 1 — original | conservadora | 236.8s | ~8,446 /seg | baseline |
|
||||||
|
| Corrida 2 — igualada | mismos parámetros | 21.94s | ~91,148 /seg | +10.8x |
|
||||||
|
| Tuning A | 4ext/8load 50k | 17.37s | ~115,200 /seg | +1.27x |
|
||||||
|
| Tuning C | 16 loaders | 17.26s | ~115,900 /seg | +1.28x |
|
||||||
|
| **Tuning D — óptimo** | **8ext/8load 50k** | **~16.77s** | **~119,261 /seg** | **+1.37x** |
|
||||||
|
| Tablock + 8 loaders | lock exclusivo serial | ~44s | ~45,000 /seg | ❌ regresión |
|
||||||
|
| Tablock + 1 loader | minimal logging | ~47s | ~42,000 /seg | ❌ regresión |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Configuración óptima — `config-reverse.yaml`
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
max_parallel_workers: 4
|
||||||
|
defaults:
|
||||||
|
batches_per_partition: 4
|
||||||
|
max_extractors: 8 # ← mayor lever de mejora
|
||||||
|
extractor_batch_size: 25000
|
||||||
|
extractor_queue_size: 32
|
||||||
|
max_transformers: 8
|
||||||
|
transformer_batch_size: 50000
|
||||||
|
transformer_queue_size: 32
|
||||||
|
max_loaders: 8
|
||||||
|
loader_batch_size: 50000 # sweet spot — 75k y 100k peores
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Análisis de la brecha final (~2x)
|
||||||
|
|
||||||
|
La diferencia residual entre ambas direcciones es estructural y está en el protocolo de escritura:
|
||||||
|
|
||||||
|
| Protocolo | Mecanismo | Overhead |
|
||||||
|
|---|---|---|
|
||||||
|
| `pgx.CopyFrom` (→ PG) | PostgreSQL COPY protocol — streaming binario sin SQL | mínimo |
|
||||||
|
| `mssql.CopyIn` (→ MSSQL) | BCP protocol — row-by-row dentro de un bulk statement | mayor por fila |
|
||||||
|
|
||||||
|
`mssql.CopyIn` itera fila a fila via `stmt.ExecContext(row...)` antes del flush final, lo que introduce overhead por fila independientemente del batch size. `pgx.CopyFrom` hace streaming puro.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Hallazgos sobre Tablock
|
||||||
|
|
||||||
|
`Tablock: true` en `mssql.BulkOptions` resultó contraproducente en ambos escenarios:
|
||||||
|
|
||||||
|
- **Con 8 loaders paralelos**: cada loader compite por un lock exclusivo de tabla → serialización completa (~44s)
|
||||||
|
- **Con 1 loader + batch enorme**: sin contención de locks, pero overhead de log + gestión de la lock exclusiva superó el beneficio de minimal logging (~47s)
|
||||||
|
|
||||||
|
**Conclusión**: para este patrón de carga (múltiples loaders concurrentes), `Tablock: false` (default) es siempre mejor.
|
||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
||||||
@@ -17,6 +18,13 @@ import (
|
|||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func newTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
|
||||||
|
if db.GetDialect() == "postgres" {
|
||||||
|
return table_analyzers.NewPostgresTableAnalyzer(db)
|
||||||
|
}
|
||||||
|
return table_analyzers.NewMssqlTableAnalyzer(db)
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
configureLog()
|
configureLog()
|
||||||
checkExpiry()
|
checkExpiry()
|
||||||
@@ -163,8 +171,8 @@ func processMigrationJobs(
|
|||||||
chJobs := make(chan config.Job, len(jobs))
|
chJobs := make(chan config.Job, len(jobs))
|
||||||
var wgJobs sync.WaitGroup
|
var wgJobs sync.WaitGroup
|
||||||
|
|
||||||
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
|
sourceTableAnalyzer := newTableAnalyzer(sourceDb)
|
||||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
targetTableAnalyzer := newTableAnalyzer(targetDb)
|
||||||
extractor := extractors.NewExtractor(sourceDb)
|
extractor := extractors.NewExtractor(sourceDb)
|
||||||
loader := loaders.NewGenericLoader(targetDb)
|
loader := loaders.NewGenericLoader(targetDb)
|
||||||
|
|
||||||
@@ -181,6 +189,7 @@ func processMigrationJobs(
|
|||||||
azureClient,
|
azureClient,
|
||||||
loader,
|
loader,
|
||||||
job,
|
job,
|
||||||
|
sourceDb.GetDialect(),
|
||||||
targetDb.GetDialect(),
|
targetDb.GetDialect(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -46,9 +46,15 @@ func processMigrationJob(
|
|||||||
azureClient *azure.Client,
|
azureClient *azure.Client,
|
||||||
loader loaders.GenericLoader,
|
loader loaders.GenericLoader,
|
||||||
job config.Job,
|
job config.Job,
|
||||||
|
sourceDbType string,
|
||||||
targetDbType string,
|
targetDbType string,
|
||||||
) models.JobResult {
|
) models.JobResult {
|
||||||
transformer := transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
|
var transformer etl.Transformer
|
||||||
|
if sourceDbType == "postgres" {
|
||||||
|
transformer = transformers.NewPostgresTransformer(job.SourceTable)
|
||||||
|
} else {
|
||||||
|
transformer = transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
|
||||||
|
}
|
||||||
localCtx, cancel := context.WithCancel(ctx)
|
localCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
|||||||
35
config-reverse-original.yaml
Normal file
35
config-reverse-original.yaml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
max_parallel_workers: 4
|
||||||
|
source_db_type: postgres
|
||||||
|
target_db_type: sqlserver
|
||||||
|
|
||||||
|
defaults:
|
||||||
|
batches_per_partition: 4
|
||||||
|
max_extractors: 2
|
||||||
|
extractor_batch_size: 5000
|
||||||
|
extractor_queue_size: 8
|
||||||
|
max_transformers: 2
|
||||||
|
transformer_batch_size: 12500
|
||||||
|
transformer_queue_size: 8
|
||||||
|
max_loaders: 4
|
||||||
|
loader_batch_size: 25000
|
||||||
|
partition_calculation_strategy: EXACT
|
||||||
|
truncate_target: true
|
||||||
|
truncate_method: TRUNCATE
|
||||||
|
retry:
|
||||||
|
attempts: 3
|
||||||
|
base_delay_ms: 500
|
||||||
|
max_delay_ms: 10000
|
||||||
|
max_jitter_ms: 500
|
||||||
|
max_failed_partitions: 5
|
||||||
|
max_failed_batches_load: 5
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
- name: cartografia_manzana_reverse
|
||||||
|
enabled: true
|
||||||
|
source:
|
||||||
|
schema: Cartografia
|
||||||
|
table: MANZANA
|
||||||
|
primary_key: GDB_ARCHIVE_OID
|
||||||
|
target:
|
||||||
|
schema: Cartografia
|
||||||
|
table: MANZANA
|
||||||
35
config-reverse.yaml
Normal file
35
config-reverse.yaml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
max_parallel_workers: 4
|
||||||
|
source_db_type: postgres
|
||||||
|
target_db_type: sqlserver
|
||||||
|
|
||||||
|
defaults:
|
||||||
|
batches_per_partition: 4
|
||||||
|
max_extractors: 8
|
||||||
|
extractor_batch_size: 25000
|
||||||
|
extractor_queue_size: 32
|
||||||
|
max_transformers: 8
|
||||||
|
transformer_batch_size: 50000
|
||||||
|
transformer_queue_size: 32
|
||||||
|
max_loaders: 8
|
||||||
|
loader_batch_size: 50000
|
||||||
|
partition_calculation_strategy: EXACT
|
||||||
|
truncate_target: true
|
||||||
|
truncate_method: TRUNCATE
|
||||||
|
retry:
|
||||||
|
attempts: 3
|
||||||
|
base_delay_ms: 500
|
||||||
|
max_delay_ms: 10000
|
||||||
|
max_jitter_ms: 500
|
||||||
|
max_failed_partitions: 5
|
||||||
|
max_failed_batches_load: 5
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
- name: cartografia_manzana_reverse
|
||||||
|
enabled: true
|
||||||
|
source:
|
||||||
|
schema: Cartografia
|
||||||
|
table: MANZANA
|
||||||
|
primary_key: GDB_ARCHIVE_OID
|
||||||
|
target:
|
||||||
|
schema: Cartografia
|
||||||
|
table: MANZANA
|
||||||
91
config.yaml
91
config.yaml
@@ -33,56 +33,45 @@ jobs:
|
|||||||
target:
|
target:
|
||||||
schema: Cartografia
|
schema: Cartografia
|
||||||
table: MANZANA
|
table: MANZANA
|
||||||
pre_sql:
|
|
||||||
- 'SELECT 1'
|
|
||||||
range:
|
|
||||||
min: 1000000
|
|
||||||
max: 2000000
|
|
||||||
is_min_inclusive: false
|
|
||||||
is_max_inclusive: true
|
|
||||||
|
|
||||||
- name: red_puerto
|
# - name: red_puerto
|
||||||
enabled: true
|
# enabled: true
|
||||||
source:
|
# source:
|
||||||
schema: Red
|
# schema: Red
|
||||||
table: PUERTO
|
# table: PUERTO
|
||||||
primary_key: ID_PUERTO
|
# primary_key: ID_PUERTO
|
||||||
from_json:
|
# from_json:
|
||||||
- column: $node_id*
|
# - column: $node_id*
|
||||||
field: id
|
# field: id
|
||||||
target:
|
# target:
|
||||||
schema: Red
|
# schema: Red
|
||||||
table: PUERTO
|
# table: PUERTO
|
||||||
pre_sql:
|
|
||||||
- 'SELECT 1'
|
|
||||||
post_sql:
|
|
||||||
- "SELECT 1"
|
|
||||||
|
|
||||||
- name: infraestructura_site_holder__attach
|
# - name: infraestructura_site_holder__attach
|
||||||
source:
|
# source:
|
||||||
schema: Infraestructura
|
# schema: Infraestructura
|
||||||
table: SITE_HOLDER__ATTACH
|
# table: SITE_HOLDER__ATTACH
|
||||||
primary_key: GDB_ARCHIVE_OID
|
# primary_key: GDB_ARCHIVE_OID
|
||||||
target:
|
# target:
|
||||||
schema: Infraestructura
|
# schema: Infraestructura
|
||||||
table: SITE_HOLDER__ATTACH
|
# table: SITE_HOLDER__ATTACH
|
||||||
to_storage:
|
# to_storage:
|
||||||
columns:
|
# columns:
|
||||||
- source: DATA
|
# - source: DATA
|
||||||
target: FILE_URL
|
# target: FILE_URL
|
||||||
mode: REFERENCE_ONLY
|
# mode: REFERENCE_ONLY
|
||||||
prefix: Infraestructura/SITE_HOLDER__ATTACH
|
# prefix: Infraestructura/SITE_HOLDER__ATTACH
|
||||||
batches_per_partition: 20
|
# batches_per_partition: 20
|
||||||
max_extractors: 32
|
# max_extractors: 32
|
||||||
extractor_batch_size: 1
|
# extractor_batch_size: 1
|
||||||
extractor_queue_size: 100
|
# extractor_queue_size: 100
|
||||||
max_transformers: 48
|
# max_transformers: 48
|
||||||
transformer_batch_size: 500
|
# transformer_batch_size: 500
|
||||||
transformer_queue_size: 8
|
# transformer_queue_size: 8
|
||||||
max_loaders: 4
|
# max_loaders: 4
|
||||||
loader_batch_size: 500
|
# loader_batch_size: 500
|
||||||
retry:
|
# retry:
|
||||||
attempts: 5
|
# attempts: 5
|
||||||
base_delay_ms: 1000
|
# base_delay_ms: 1000
|
||||||
max_delay_ms: 15000
|
# max_delay_ms: 15000
|
||||||
max_jitter_ms: 500
|
# max_jitter_ms: 500
|
||||||
|
|||||||
2
docker/.gitignore
vendored
Normal file
2
docker/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
data/**/*
|
||||||
|
compose.override.yml
|
||||||
50
docker/compose.yml
Normal file
50
docker/compose.yml
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
name: db-migration
|
||||||
|
services:
|
||||||
|
azurite:
|
||||||
|
image: mcr.microsoft.com/azure-storage/azurite:3.35.0
|
||||||
|
container_name: azurite
|
||||||
|
restart: unless-stopped
|
||||||
|
ports:
|
||||||
|
- 8880:10000
|
||||||
|
- 8881:10001
|
||||||
|
- 8882:10002
|
||||||
|
volumes:
|
||||||
|
- ./data/azurite:/data
|
||||||
|
command: 'azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0 --location /data --skipApiVersionCheck'
|
||||||
|
profiles:
|
||||||
|
- storage
|
||||||
|
- target
|
||||||
|
|
||||||
|
mssql:
|
||||||
|
image: mcr.microsoft.com/mssql/server:2022-latest
|
||||||
|
restart: unless-stopped
|
||||||
|
environment:
|
||||||
|
ACCEPT_EULA: Y
|
||||||
|
MSSQL_SA_PASSWORD: SecurePassword123
|
||||||
|
MSSQL_PID: Developer
|
||||||
|
MSSQL_MEMORY_LIMIT_MB: 8192
|
||||||
|
ports:
|
||||||
|
- 8883:1433
|
||||||
|
volumes:
|
||||||
|
- ./data/mssql:/var/opt/mssql
|
||||||
|
profiles:
|
||||||
|
- mssql
|
||||||
|
- source
|
||||||
|
- db
|
||||||
|
|
||||||
|
postgres:
|
||||||
|
image: postgis/postgis:16-3.4
|
||||||
|
restart: unless-stopped
|
||||||
|
environment:
|
||||||
|
POSTGRES_DB: test_db
|
||||||
|
POSTGRES_USER: postgres
|
||||||
|
POSTGRES_PASSWORD: SecurePassword123
|
||||||
|
ports:
|
||||||
|
- 8884:5432
|
||||||
|
volumes:
|
||||||
|
- ./data/postgres:/var/lib/postgresql/data
|
||||||
|
profiles:
|
||||||
|
- postgres
|
||||||
|
- target
|
||||||
|
- db
|
||||||
|
shm_size: '1gb'
|
||||||
@@ -2,6 +2,7 @@ package table_analyzers
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -9,6 +10,7 @@ import (
|
|||||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PostgresTableAnalyzer struct {
|
type PostgresTableAnalyzer struct {
|
||||||
@@ -161,7 +163,30 @@ func (ta *PostgresTableAnalyzer) EstimateTotalRows(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.TableInfo,
|
tableInfo config.TableInfo,
|
||||||
) (int64, error) {
|
) (int64, error) {
|
||||||
return 0, nil
|
query := `
|
||||||
|
SELECT reltuples::bigint
|
||||||
|
FROM pg_class
|
||||||
|
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
|
||||||
|
WHERE pg_namespace.nspname = $1 AND pg_class.relname = $2`
|
||||||
|
|
||||||
|
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var estimate int64
|
||||||
|
err := ta.db.QueryRow(ctxTimeout, query, tableInfo.Schema, tableInfo.Table).Scan(&estimate)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if estimate < 0 {
|
||||||
|
countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM "%s"."%s"`, tableInfo.Schema, tableInfo.Table)
|
||||||
|
err = ta.db.QueryRow(ctxTimeout, countQuery).Scan(&estimate)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return estimate, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn(
|
func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn(
|
||||||
@@ -169,7 +194,19 @@ func (ta *PostgresTableAnalyzer) QueryMaxMinFromColumn(
|
|||||||
tableInfo config.TableInfo,
|
tableInfo config.TableInfo,
|
||||||
columnName string,
|
columnName string,
|
||||||
) (etl.MaxMinColumnResult, error) {
|
) (etl.MaxMinColumnResult, error) {
|
||||||
return etl.MaxMinColumnResult{}, nil
|
query := fmt.Sprintf(`SELECT MIN("%s"), MAX("%s") FROM "%s"."%s"`,
|
||||||
|
columnName, columnName, tableInfo.Schema, tableInfo.Table)
|
||||||
|
|
||||||
|
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
result := etl.MaxMinColumnResult{}
|
||||||
|
err := ta.db.QueryRow(ctxTimeout, query).Scan(&result.Min, &result.Max)
|
||||||
|
if err != nil {
|
||||||
|
return etl.MaxMinColumnResult{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
|
func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
|
||||||
@@ -179,5 +216,78 @@ func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
|
|||||||
maxPartitions int64,
|
maxPartitions int64,
|
||||||
rangeConstraint config.RangeConfig,
|
rangeConstraint config.RangeConfig,
|
||||||
) ([]models.Partition, error) {
|
) ([]models.Partition, error) {
|
||||||
return []models.Partition{}, nil
|
whereClause := ""
|
||||||
|
args := []any{maxPartitions}
|
||||||
|
|
||||||
|
if rangeConstraint.Min != nil || rangeConstraint.Max != nil {
|
||||||
|
var conditions []string
|
||||||
|
if rangeConstraint.Min != nil {
|
||||||
|
minOp := ">"
|
||||||
|
if rangeConstraint.IsMinInclusive {
|
||||||
|
minOp = ">="
|
||||||
|
}
|
||||||
|
args = append(args, *rangeConstraint.Min)
|
||||||
|
conditions = append(conditions, fmt.Sprintf(`"%s" %s $%d`, partitionColumn, minOp, len(args)))
|
||||||
|
}
|
||||||
|
if rangeConstraint.Max != nil {
|
||||||
|
maxOp := "<"
|
||||||
|
if rangeConstraint.IsMaxInclusive {
|
||||||
|
maxOp = "<="
|
||||||
|
}
|
||||||
|
args = append(args, *rangeConstraint.Max)
|
||||||
|
conditions = append(conditions, fmt.Sprintf(`"%s" %s $%d`, partitionColumn, maxOp, len(args)))
|
||||||
|
}
|
||||||
|
whereClause = "WHERE " + strings.Join(conditions, " AND ")
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
SELECT MIN("%s") AS lower_limit, MAX("%s") AS upper_limit
|
||||||
|
FROM (
|
||||||
|
SELECT "%s", NTILE($1) OVER (ORDER BY "%s") AS batch_id
|
||||||
|
FROM "%s"."%s" %s
|
||||||
|
) AS t
|
||||||
|
GROUP BY batch_id
|
||||||
|
ORDER BY batch_id`,
|
||||||
|
partitionColumn,
|
||||||
|
partitionColumn,
|
||||||
|
partitionColumn,
|
||||||
|
partitionColumn,
|
||||||
|
tableInfo.Schema,
|
||||||
|
tableInfo.Table,
|
||||||
|
whereClause)
|
||||||
|
|
||||||
|
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
rows, err := ta.db.Query(ctxTimeout, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
partitions := make([]models.Partition, 0, maxPartitions)
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
partition := models.Partition{
|
||||||
|
Id: uuid.New(),
|
||||||
|
HasRange: true,
|
||||||
|
RetryCounter: 0,
|
||||||
|
Range: models.PartitionRange{
|
||||||
|
IsMinInclusive: true,
|
||||||
|
IsMaxInclusive: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
partitions = append(partitions, partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return partitions, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,6 +59,65 @@ func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransfor
|
|||||||
return plan
|
return plan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func computePostgresTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
||||||
|
var plan []etl.ColumnTransformPlan
|
||||||
|
|
||||||
|
for i, col := range columns {
|
||||||
|
switch col.SystemType() {
|
||||||
|
case "int2", "int4", "int8", "integer", "smallint", "bigint":
|
||||||
|
plan = append(plan, etl.ColumnTransformPlan{
|
||||||
|
Index: i,
|
||||||
|
Fn: func(v any) (any, error) {
|
||||||
|
if v64, ok := ToInt64(v); ok {
|
||||||
|
return v64, nil
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
case "uuid":
|
||||||
|
plan = append(plan, etl.ColumnTransformPlan{
|
||||||
|
Index: i,
|
||||||
|
Fn: func(v any) (any, error) {
|
||||||
|
switch b := v.(type) {
|
||||||
|
case []byte:
|
||||||
|
if b != nil {
|
||||||
|
return bigEndianToMssqlUuid(b)
|
||||||
|
}
|
||||||
|
case [16]byte:
|
||||||
|
return bigEndianToMssqlUuid(b[:])
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
case "geometry":
|
||||||
|
plan = append(plan, etl.ColumnTransformPlan{
|
||||||
|
Index: i,
|
||||||
|
Fn: func(v any) (any, error) {
|
||||||
|
if b, ok := v.([]byte); ok && b != nil {
|
||||||
|
return ewkbToMssqlGeo(b, false)
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
case "geography":
|
||||||
|
plan = append(plan, etl.ColumnTransformPlan{
|
||||||
|
Index: i,
|
||||||
|
Fn: func(v any) (any, error) {
|
||||||
|
if b, ok := v.([]byte); ok && b != nil {
|
||||||
|
return ewkbToMssqlGeo(b, true)
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return plan
|
||||||
|
}
|
||||||
|
|
||||||
func computeStorageTransformationPlan(
|
func computeStorageTransformationPlan(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
azureClient *azure.Client,
|
azureClient *azure.Client,
|
||||||
|
|||||||
72
internal/app/etl/transformers/postgres.go
Normal file
72
internal/app/etl/transformers/postgres.go
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
package transformers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PostgresTransformer struct {
|
||||||
|
sourceTable config.SourceTableInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPostgresTransformer(sourceTable config.SourceTableInfo) etl.Transformer {
|
||||||
|
return &PostgresTransformer{sourceTable: sourceTable}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pgTr *PostgresTransformer) Consume(
|
||||||
|
ctx context.Context,
|
||||||
|
columns []models.ColumnType,
|
||||||
|
retryConfig config.RetryConfig,
|
||||||
|
batchSize int,
|
||||||
|
chBatchesIn <-chan models.Batch,
|
||||||
|
chBatchesOut chan<- models.Batch,
|
||||||
|
chJobErrorsOut chan<- custom_errors.JobError,
|
||||||
|
wgActiveBatches *sync.WaitGroup,
|
||||||
|
) {
|
||||||
|
transformationPlan := computePostgresTransformationPlan(columns)
|
||||||
|
|
||||||
|
acc := &batchAccumulator{batchSize: batchSize}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case batch, ok := <-chBatchesIn:
|
||||||
|
if !ok {
|
||||||
|
acc.flush(ctx, chBatchesOut, wgActiveBatches)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(transformationPlan) > 0 {
|
||||||
|
if err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig); err != nil {
|
||||||
|
sendTransformError(ctx, err, chJobErrorsOut)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if batchSize <= 0 {
|
||||||
|
wgActiveBatches.Add(1)
|
||||||
|
select {
|
||||||
|
case chBatchesOut <- batch:
|
||||||
|
case <-ctx.Done():
|
||||||
|
wgActiveBatches.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.add(batch)
|
||||||
|
if acc.ready() {
|
||||||
|
if !acc.flush(ctx, chBatchesOut, wgActiveBatches) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,8 @@ import (
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
mssqlclrgeo "github.com/gaspardle/go-mssqlclrgeo"
|
||||||
)
|
)
|
||||||
|
|
||||||
func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
|
func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
|
||||||
@@ -62,6 +64,29 @@ func ensureUTC(t time.Time) time.Time {
|
|||||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
|
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func bigEndianToMssqlUuid(pgUuid []byte) ([]byte, error) {
|
||||||
|
if len(pgUuid) != 16 {
|
||||||
|
return nil, errors.New("Invalid uuid")
|
||||||
|
}
|
||||||
|
|
||||||
|
mssqlUuid := make([]byte, 16)
|
||||||
|
mssqlUuid[0], mssqlUuid[1], mssqlUuid[2], mssqlUuid[3] = pgUuid[3], pgUuid[2], pgUuid[1], pgUuid[0]
|
||||||
|
mssqlUuid[4], mssqlUuid[5] = pgUuid[5], pgUuid[4]
|
||||||
|
mssqlUuid[6], mssqlUuid[7] = pgUuid[7], pgUuid[6]
|
||||||
|
copy(mssqlUuid[8:], pgUuid[8:])
|
||||||
|
|
||||||
|
return mssqlUuid, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error) {
|
||||||
|
if len(ewkb) < 5 {
|
||||||
|
return nil, errors.New("Invalid ewkb")
|
||||||
|
}
|
||||||
|
// mssqlclrgeo reads the SRID flag and bytes directly from EWKB,
|
||||||
|
// so no pre-processing needed — pass through as-is.
|
||||||
|
return mssqlclrgeo.WkbToUdtGeo(ewkb, isGeography)
|
||||||
|
}
|
||||||
|
|
||||||
func ToInt64(v any) (int64, bool) {
|
func ToInt64(v any) (int64, bool) {
|
||||||
switch t := v.(type) {
|
switch t := v.(type) {
|
||||||
case int:
|
case int:
|
||||||
|
|||||||
137
openspec/changes/bidirectional-transforms/plan.md
Normal file
137
openspec/changes/bidirectional-transforms/plan.md
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
# Plan: Bidirectional Transformation Support
|
||||||
|
|
||||||
|
## Goal
|
||||||
|
|
||||||
|
Make the transformation pipeline direction-aware. Currently hardcoded to MSSQL → PG; add support for PG → MSSQL by applying inverse transformations when `SourceDbType == "postgres"`.
|
||||||
|
|
||||||
|
Excluded: `to_storage` Azure blob upload (not reversible).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Hardcoded wiring to fix
|
||||||
|
|
||||||
|
| File | Line | Change |
|
||||||
|
|---|---|---|
|
||||||
|
| `cmd/go_migrate/process.go` | 51 | Branch on `SourceDbType`: `"sqlserver"` → `NewMssqlTransformer`, `"postgres"` → `NewPostgresTransformer` |
|
||||||
|
| `cmd/go_migrate/main.go` | 166–167 | Branch on source/target type for both `TableAnalyzer` selections |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Transformations
|
||||||
|
|
||||||
|
### Forward (MSSQL → PG) — unchanged
|
||||||
|
|
||||||
|
| Column type | Function | File |
|
||||||
|
|---|---|---|
|
||||||
|
| `uniqueidentifier` | `mssqlUuidToBigEndian` | `utils.go:9` |
|
||||||
|
| `geometry`/`geography` | `wkbToEwkbWithSrid` | `utils.go:25` |
|
||||||
|
| `datetime`/`datetime2` | `ensureUTC` | `utils.go:57` |
|
||||||
|
|
||||||
|
### Inverse (PG → MSSQL) — new
|
||||||
|
|
||||||
|
| PG system type | Action |
|
||||||
|
|---|---|
|
||||||
|
| `uuid` | `bigEndianToMssqlUuid`: re-swap bytes [0-3], [4-5], [6-7] |
|
||||||
|
| `geometry` | `ewkbToMssqlGeo(v, false)`: strip SRID → WKB → `WkbToUdtGeo` |
|
||||||
|
| `geography` | `ewkbToMssqlGeo(v, true)`: strip SRID → WKB → `WkbToUdtGeo` |
|
||||||
|
| `timestamp`/`timestamptz` | no-op |
|
||||||
|
|
||||||
|
**Geometry note**: MSSQL rejects plain WKB via bulk protocol. Must use `mssqlclrgeo.WkbToUdtGeo(wkb, isGeography)` (already in go.mod). PG extractor already emits EWKB via `ST_AsEWKB()`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## New utility functions (`transformers/utils.go`)
|
||||||
|
|
||||||
|
### `bigEndianToMssqlUuid(v []byte) []byte`
|
||||||
|
```
|
||||||
|
out[0..3] = v[3,2,1,0]
|
||||||
|
out[4..5] = v[5,4]
|
||||||
|
out[6..7] = v[7,6]
|
||||||
|
out[8..15] = v[8..15]
|
||||||
|
```
|
||||||
|
|
||||||
|
### `ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error)`
|
||||||
|
1. Read byte-order flag from `ewkb[0]`
|
||||||
|
2. Read geometry type word bytes [1..4]
|
||||||
|
3. If SRID flag (`0x20000000`) is set: strip bytes [5..8], clear flag in type word
|
||||||
|
4. Call `mssqlclrgeo.WkbToUdtGeo(wkb, isGeography)`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## New files
|
||||||
|
|
||||||
|
### `transformers/postgres.go`
|
||||||
|
```go
|
||||||
|
func NewPostgresTransformer(...) *Transformer {
|
||||||
|
// same signature as NewMssqlTransformer
|
||||||
|
// calls computePostgresTransformationPlan instead
|
||||||
|
// does NOT call computeStorageTransformationPlan
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### `computePostgresTransformationPlan` in `transformers/plan.go`
|
||||||
|
Iterates `sourceColTypes` (from PG analyzer), applies inverse closures by system type.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## PostgreSQL table analyzer stubs to implement (`table_analyzers/postgres.go`)
|
||||||
|
|
||||||
|
Required for PG-as-source partitioned extraction:
|
||||||
|
|
||||||
|
### `EstimateTotalRows`
|
||||||
|
```sql
|
||||||
|
SELECT reltuples::bigint FROM pg_class
|
||||||
|
JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
|
||||||
|
WHERE pg_namespace.nspname = $schema AND pg_class.relname = $table
|
||||||
|
```
|
||||||
|
Fallback to `COUNT(*)` if `reltuples < 0`.
|
||||||
|
|
||||||
|
### `QueryMaxMinFromColumn`
|
||||||
|
```sql
|
||||||
|
SELECT MIN("col"), MAX("col") FROM "schema"."table"
|
||||||
|
```
|
||||||
|
|
||||||
|
### `CalculatePartitionRanges`
|
||||||
|
Use min/max from above + `rowsPerPartition` to compute boundaries. Mirror the logic from `MssqlTableAnalyzer.CalculatePartitionRanges`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Test cases
|
||||||
|
|
||||||
|
### TC-1: `bigEndianToMssqlUuid` — round-trip
|
||||||
|
- Input: run `mssqlUuidToBigEndian` on a known 16-byte MSSQL UUID → produces PG UUID
|
||||||
|
- Assert: `bigEndianToMssqlUuid(pgUUID)` == original MSSQL UUID bytes
|
||||||
|
- Also assert nil input → nil output (no panic)
|
||||||
|
|
||||||
|
### TC-2: `bigEndianToMssqlUuid` — known vector
|
||||||
|
- Input: `[0x6b,0xa7,0xb8,0x10, 0x9d,0xad, 0x11,0xd1, 0x80,0xb4,0x00,0xc0,0x4f,0xd4,0x30,0xc8]` (RFC 4122 nil UUID variant)
|
||||||
|
- Assert: bytes [0-3] are reversed, [4-5] reversed, [6-7] reversed, [8-15] identical
|
||||||
|
|
||||||
|
### TC-3: `ewkbToMssqlGeo` — geometry round-trip
|
||||||
|
- Input: generate a polygon via `go-geom` + `wkb.Marshal` → plain WKB
|
||||||
|
- Forward: run `wkbToEwkbWithSrid` → EWKB
|
||||||
|
- Inverse: run `ewkbToMssqlGeo(ewkb, false)` → CLR/UDT bytes
|
||||||
|
- Assert: no error, output is non-empty `[]byte`
|
||||||
|
|
||||||
|
### TC-4: `ewkbToMssqlGeo` — nil input
|
||||||
|
- Input: nil
|
||||||
|
- Assert: returns nil, nil (no panic)
|
||||||
|
|
||||||
|
### TC-5: `ewkbToMssqlGeo` — EWKB without SRID flag
|
||||||
|
- Input: plain WKB (no SRID flag set)
|
||||||
|
- Assert: function still calls `WkbToUdtGeo` and returns without error
|
||||||
|
|
||||||
|
### TC-6: Transformer factory selection
|
||||||
|
- Given `SourceDbType == "postgres"` → `NewPostgresTransformer` is selected
|
||||||
|
- Given `SourceDbType == "sqlserver"` → `NewMssqlTransformer` is selected
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Files changed (summary)
|
||||||
|
|
||||||
|
1. `cmd/go_migrate/process.go` — transformer factory branch
|
||||||
|
2. `cmd/go_migrate/main.go` — analyzer selection branch
|
||||||
|
3. `internal/app/etl/transformers/utils.go` — 2 new functions
|
||||||
|
4. `internal/app/etl/transformers/plan.go` — `computePostgresTransformationPlan`
|
||||||
|
5. `internal/app/etl/transformers/postgres.go` *(new)*
|
||||||
|
6. `internal/app/etl/table_analyzers/postgres.go` — 3 stub implementations
|
||||||
@@ -12,10 +12,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// totalRows int = 1_000_000
|
totalRows int = 2_000_000
|
||||||
totalRows int = 1000
|
chunkSize int = 5000
|
||||||
chunkSize int = 200
|
queueSize int = 8
|
||||||
queueSize int = 4
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -41,13 +40,13 @@ func main() {
|
|||||||
seedManzanas(ctx, db)
|
seedManzanas(ctx, db)
|
||||||
})
|
})
|
||||||
|
|
||||||
wgSeed.Go(func() {
|
// wgSeed.Go(func() {
|
||||||
seedPuertos(ctx, db)
|
// seedPuertos(ctx, db)
|
||||||
})
|
// })
|
||||||
|
|
||||||
wgSeed.Go(func() {
|
// wgSeed.Go(func() {
|
||||||
seedSiteHolderAttach(ctx, db)
|
// seedSiteHolderAttach(ctx, db)
|
||||||
})
|
// })
|
||||||
|
|
||||||
wgSeed.Wait()
|
wgSeed.Wait()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user