refactor: add reverse configuration and enhance transformation logic for PostgreSQL to SQL Server migration

This commit is contained in:
2026-05-29 13:23:00 -05:00
parent f844004942
commit 13cd02a824
5 changed files with 102 additions and 84 deletions

35
config-reverse.yaml Normal file
View File

@@ -0,0 +1,35 @@
max_parallel_workers: 2
source_db_type: postgres
target_db_type: sqlserver
defaults:
batches_per_partition: 2
max_extractors: 2
extractor_batch_size: 500
extractor_queue_size: 4
max_transformers: 2
transformer_batch_size: 2500
transformer_queue_size: 4
max_loaders: 2
loader_batch_size: 5000
partition_calculation_strategy: EXACT
truncate_target: true
truncate_method: TRUNCATE
retry:
attempts: 3
base_delay_ms: 500
max_delay_ms: 10000
max_jitter_ms: 500
max_failed_partitions: 5
max_failed_batches_load: 5
jobs:
- name: cartografia_manzana_reverse
enabled: true
source:
schema: Cartografia
table: MANZANA
primary_key: GDB_ARCHIVE_OID
target:
schema: Cartografia
table: MANZANA

View File

@@ -33,56 +33,45 @@ jobs:
target: target:
schema: Cartografia schema: Cartografia
table: MANZANA table: MANZANA
pre_sql:
- 'SELECT 1'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto # - name: red_puerto
enabled: true # enabled: true
source: # source:
schema: Red # schema: Red
table: PUERTO # table: PUERTO
primary_key: ID_PUERTO # primary_key: ID_PUERTO
from_json: # from_json:
- column: $node_id* # - column: $node_id*
field: id # field: id
target: # target:
schema: Red # schema: Red
table: PUERTO # table: PUERTO
pre_sql:
- 'SELECT 1'
post_sql:
- "SELECT 1"
- name: infraestructura_site_holder__attach # - name: infraestructura_site_holder__attach
source: # source:
schema: Infraestructura # schema: Infraestructura
table: SITE_HOLDER__ATTACH # table: SITE_HOLDER__ATTACH
primary_key: GDB_ARCHIVE_OID # primary_key: GDB_ARCHIVE_OID
target: # target:
schema: Infraestructura # schema: Infraestructura
table: SITE_HOLDER__ATTACH # table: SITE_HOLDER__ATTACH
to_storage: # to_storage:
columns: # columns:
- source: DATA # - source: DATA
target: FILE_URL # target: FILE_URL
mode: REFERENCE_ONLY # mode: REFERENCE_ONLY
prefix: Infraestructura/SITE_HOLDER__ATTACH # prefix: Infraestructura/SITE_HOLDER__ATTACH
batches_per_partition: 20 # batches_per_partition: 20
max_extractors: 32 # max_extractors: 32
extractor_batch_size: 1 # extractor_batch_size: 1
extractor_queue_size: 100 # extractor_queue_size: 100
max_transformers: 48 # max_transformers: 48
transformer_batch_size: 500 # transformer_batch_size: 500
transformer_queue_size: 8 # transformer_queue_size: 8
max_loaders: 4 # max_loaders: 4
loader_batch_size: 500 # loader_batch_size: 500
retry: # retry:
attempts: 5 # attempts: 5
base_delay_ms: 1000 # base_delay_ms: 1000
max_delay_ms: 15000 # max_delay_ms: 15000
max_jitter_ms: 500 # max_jitter_ms: 500

View File

@@ -64,12 +64,28 @@ func computePostgresTransformationPlan(columns []models.ColumnType) []etl.Column
for i, col := range columns { for i, col := range columns {
switch col.SystemType() { switch col.SystemType() {
case "int2", "int4", "int8", "integer", "smallint", "bigint":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if v64, ok := ToInt64(v); ok {
return v64, nil
}
return v, nil
},
})
case "uuid": case "uuid":
plan = append(plan, etl.ColumnTransformPlan{ plan = append(plan, etl.ColumnTransformPlan{
Index: i, Index: i,
Fn: func(v any) (any, error) { Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil { switch b := v.(type) {
return bigEndianToMssqlUuid(b) case []byte:
if b != nil {
return bigEndianToMssqlUuid(b)
}
case [16]byte:
return bigEndianToMssqlUuid(b[:])
} }
return v, nil return v, nil
}, },

View File

@@ -82,31 +82,9 @@ func ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error) {
if len(ewkb) < 5 { if len(ewkb) < 5 {
return nil, errors.New("Invalid ewkb") return nil, errors.New("Invalid ewkb")
} }
// mssqlclrgeo reads the SRID flag and bytes directly from EWKB,
var byteOrder binary.ByteOrder // so no pre-processing needed — pass through as-is.
if ewkb[0] == 0 { return mssqlclrgeo.WkbToUdtGeo(ewkb, isGeography)
byteOrder = binary.BigEndian
} else {
byteOrder = binary.LittleEndian
}
wkbType := byteOrder.Uint32(ewkb[1:5])
var wkb []byte
if wkbType&sridFlag != 0 {
if len(ewkb) < 9 {
return nil, errors.New("Invalid ewkb: SRID flag set but data too short")
}
clearType := wkbType &^ uint32(sridFlag)
wkb = make([]byte, len(ewkb)-4)
wkb[0] = ewkb[0]
byteOrder.PutUint32(wkb[1:5], clearType)
copy(wkb[5:], ewkb[9:])
} else {
wkb = ewkb
}
return mssqlclrgeo.WkbToUdtGeo(wkb, isGeography)
} }
func ToInt64(v any) (int64, bool) { func ToInt64(v any) (int64, bool) {

View File

@@ -41,13 +41,13 @@ func main() {
seedManzanas(ctx, db) seedManzanas(ctx, db)
}) })
wgSeed.Go(func() { // wgSeed.Go(func() {
seedPuertos(ctx, db) // seedPuertos(ctx, db)
}) // })
wgSeed.Go(func() { // wgSeed.Go(func() {
seedSiteHolderAttach(ctx, db) // seedSiteHolderAttach(ctx, db)
}) // })
wgSeed.Wait() wgSeed.Wait()
} }