diff --git a/config-reverse.yaml b/config-reverse.yaml new file mode 100644 index 0000000..c1e88e9 --- /dev/null +++ b/config-reverse.yaml @@ -0,0 +1,35 @@ +max_parallel_workers: 2 +source_db_type: postgres +target_db_type: sqlserver + +defaults: + batches_per_partition: 2 + max_extractors: 2 + extractor_batch_size: 500 + extractor_queue_size: 4 + max_transformers: 2 + transformer_batch_size: 2500 + transformer_queue_size: 4 + max_loaders: 2 + loader_batch_size: 5000 + partition_calculation_strategy: EXACT + truncate_target: true + truncate_method: TRUNCATE + retry: + attempts: 3 + base_delay_ms: 500 + max_delay_ms: 10000 + max_jitter_ms: 500 + max_failed_partitions: 5 + max_failed_batches_load: 5 + +jobs: + - name: cartografia_manzana_reverse + enabled: true + source: + schema: Cartografia + table: MANZANA + primary_key: GDB_ARCHIVE_OID + target: + schema: Cartografia + table: MANZANA diff --git a/config.yaml b/config.yaml index 169ce06..c8ea2d0 100644 --- a/config.yaml +++ b/config.yaml @@ -33,56 +33,45 @@ jobs: target: schema: Cartografia table: MANZANA - pre_sql: - - 'SELECT 1' - range: - min: 1000000 - max: 2000000 - is_min_inclusive: false - is_max_inclusive: true - - name: red_puerto - enabled: true - source: - schema: Red - table: PUERTO - primary_key: ID_PUERTO - from_json: - - column: $node_id* - field: id - target: - schema: Red - table: PUERTO - pre_sql: - - 'SELECT 1' - post_sql: - - "SELECT 1" + # - name: red_puerto + # enabled: true + # source: + # schema: Red + # table: PUERTO + # primary_key: ID_PUERTO + # from_json: + # - column: $node_id* + # field: id + # target: + # schema: Red + # table: PUERTO - - name: infraestructura_site_holder__attach - source: - schema: Infraestructura - table: SITE_HOLDER__ATTACH - primary_key: GDB_ARCHIVE_OID - target: - schema: Infraestructura - table: SITE_HOLDER__ATTACH - to_storage: - columns: - - source: DATA - target: FILE_URL - mode: REFERENCE_ONLY - prefix: Infraestructura/SITE_HOLDER__ATTACH - batches_per_partition: 20 - max_extractors: 32 - extractor_batch_size: 1 - extractor_queue_size: 100 - max_transformers: 48 - transformer_batch_size: 500 - transformer_queue_size: 8 - max_loaders: 4 - loader_batch_size: 500 - retry: - attempts: 5 - base_delay_ms: 1000 - max_delay_ms: 15000 - max_jitter_ms: 500 + # - name: infraestructura_site_holder__attach + # source: + # schema: Infraestructura + # table: SITE_HOLDER__ATTACH + # primary_key: GDB_ARCHIVE_OID + # target: + # schema: Infraestructura + # table: SITE_HOLDER__ATTACH + # to_storage: + # columns: + # - source: DATA + # target: FILE_URL + # mode: REFERENCE_ONLY + # prefix: Infraestructura/SITE_HOLDER__ATTACH + # batches_per_partition: 20 + # max_extractors: 32 + # extractor_batch_size: 1 + # extractor_queue_size: 100 + # max_transformers: 48 + # transformer_batch_size: 500 + # transformer_queue_size: 8 + # max_loaders: 4 + # loader_batch_size: 500 + # retry: + # attempts: 5 + # base_delay_ms: 1000 + # max_delay_ms: 15000 + # max_jitter_ms: 500 diff --git a/internal/app/etl/transformers/plan.go b/internal/app/etl/transformers/plan.go index f03f286..a695f64 100644 --- a/internal/app/etl/transformers/plan.go +++ b/internal/app/etl/transformers/plan.go @@ -64,12 +64,28 @@ func computePostgresTransformationPlan(columns []models.ColumnType) []etl.Column for i, col := range columns { switch col.SystemType() { + case "int2", "int4", "int8", "integer", "smallint", "bigint": + plan = append(plan, etl.ColumnTransformPlan{ + Index: i, + Fn: func(v any) (any, error) { + if v64, ok := ToInt64(v); ok { + return v64, nil + } + return v, nil + }, + }) + case "uuid": plan = append(plan, etl.ColumnTransformPlan{ Index: i, Fn: func(v any) (any, error) { - if b, ok := v.([]byte); ok && b != nil { - return bigEndianToMssqlUuid(b) + switch b := v.(type) { + case []byte: + if b != nil { + return bigEndianToMssqlUuid(b) + } + case [16]byte: + return bigEndianToMssqlUuid(b[:]) } return v, nil }, diff --git a/internal/app/etl/transformers/utils.go b/internal/app/etl/transformers/utils.go index 24f4360..91512ae 100644 --- a/internal/app/etl/transformers/utils.go +++ b/internal/app/etl/transformers/utils.go @@ -82,31 +82,9 @@ func ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error) { if len(ewkb) < 5 { return nil, errors.New("Invalid ewkb") } - - var byteOrder binary.ByteOrder - if ewkb[0] == 0 { - byteOrder = binary.BigEndian - } else { - byteOrder = binary.LittleEndian - } - - wkbType := byteOrder.Uint32(ewkb[1:5]) - - var wkb []byte - if wkbType&sridFlag != 0 { - if len(ewkb) < 9 { - return nil, errors.New("Invalid ewkb: SRID flag set but data too short") - } - clearType := wkbType &^ uint32(sridFlag) - wkb = make([]byte, len(ewkb)-4) - wkb[0] = ewkb[0] - byteOrder.PutUint32(wkb[1:5], clearType) - copy(wkb[5:], ewkb[9:]) - } else { - wkb = ewkb - } - - return mssqlclrgeo.WkbToUdtGeo(wkb, isGeography) + // mssqlclrgeo reads the SRID flag and bytes directly from EWKB, + // so no pre-processing needed — pass through as-is. + return mssqlclrgeo.WkbToUdtGeo(ewkb, isGeography) } func ToInt64(v any) (int64, bool) { diff --git a/scripts/mssql-copy-in/main.go b/scripts/mssql-copy-in/main.go index ea251b5..c25535c 100644 --- a/scripts/mssql-copy-in/main.go +++ b/scripts/mssql-copy-in/main.go @@ -41,13 +41,13 @@ func main() { seedManzanas(ctx, db) }) - wgSeed.Go(func() { - seedPuertos(ctx, db) - }) + // wgSeed.Go(func() { + // seedPuertos(ctx, db) + // }) - wgSeed.Go(func() { - seedSiteHolderAttach(ctx, db) - }) + // wgSeed.Go(func() { + // seedSiteHolderAttach(ctx, db) + // }) wgSeed.Wait() }