Files

4.6 KiB
Raw Permalink Blame History

Plan: Bidirectional Transformation Support

Goal

Make the transformation pipeline direction-aware. Currently hardcoded to MSSQL → PG; add support for PG → MSSQL by applying inverse transformations when SourceDbType == "postgres".

Excluded: to_storage Azure blob upload (not reversible).


Hardcoded wiring to fix

File Line Change
cmd/go_migrate/process.go 51 Branch on SourceDbType: "sqlserver"NewMssqlTransformer, "postgres"NewPostgresTransformer
cmd/go_migrate/main.go 166167 Branch on source/target type for both TableAnalyzer selections

Transformations

Forward (MSSQL → PG) — unchanged

Column type Function File
uniqueidentifier mssqlUuidToBigEndian utils.go:9
geometry/geography wkbToEwkbWithSrid utils.go:25
datetime/datetime2 ensureUTC utils.go:57

Inverse (PG → MSSQL) — new

PG system type Action
uuid bigEndianToMssqlUuid: re-swap bytes [0-3], [4-5], [6-7]
geometry ewkbToMssqlGeo(v, false): strip SRID → WKB → WkbToUdtGeo
geography ewkbToMssqlGeo(v, true): strip SRID → WKB → WkbToUdtGeo
timestamp/timestamptz no-op

Geometry note: MSSQL rejects plain WKB via bulk protocol. Must use mssqlclrgeo.WkbToUdtGeo(wkb, isGeography) (already in go.mod). PG extractor already emits EWKB via ST_AsEWKB().


New utility functions (transformers/utils.go)

bigEndianToMssqlUuid(v []byte) []byte

out[0..3]  = v[3,2,1,0]
out[4..5]  = v[5,4]
out[6..7]  = v[7,6]
out[8..15] = v[8..15]

ewkbToMssqlGeo(ewkb []byte, isGeography bool) ([]byte, error)

  1. Read byte-order flag from ewkb[0]
  2. Read geometry type word bytes [1..4]
  3. If SRID flag (0x20000000) is set: strip bytes [5..8], clear flag in type word
  4. Call mssqlclrgeo.WkbToUdtGeo(wkb, isGeography)

New files

transformers/postgres.go

func NewPostgresTransformer(...) *Transformer {
    // same signature as NewMssqlTransformer
    // calls computePostgresTransformationPlan instead
    // does NOT call computeStorageTransformationPlan
}

computePostgresTransformationPlan in transformers/plan.go

Iterates sourceColTypes (from PG analyzer), applies inverse closures by system type.


PostgreSQL table analyzer stubs to implement (table_analyzers/postgres.go)

Required for PG-as-source partitioned extraction:

EstimateTotalRows

SELECT reltuples::bigint FROM pg_class
  JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace
  WHERE pg_namespace.nspname = $schema AND pg_class.relname = $table

Fallback to COUNT(*) if reltuples < 0.

QueryMaxMinFromColumn

SELECT MIN("col"), MAX("col") FROM "schema"."table"

CalculatePartitionRanges

Use min/max from above + rowsPerPartition to compute boundaries. Mirror the logic from MssqlTableAnalyzer.CalculatePartitionRanges.


Test cases

TC-1: bigEndianToMssqlUuid — round-trip

  • Input: run mssqlUuidToBigEndian on a known 16-byte MSSQL UUID → produces PG UUID
  • Assert: bigEndianToMssqlUuid(pgUUID) == original MSSQL UUID bytes
  • Also assert nil input → nil output (no panic)

TC-2: bigEndianToMssqlUuid — known vector

  • Input: [0x6b,0xa7,0xb8,0x10, 0x9d,0xad, 0x11,0xd1, 0x80,0xb4,0x00,0xc0,0x4f,0xd4,0x30,0xc8] (RFC 4122 nil UUID variant)
  • Assert: bytes [0-3] are reversed, [4-5] reversed, [6-7] reversed, [8-15] identical

TC-3: ewkbToMssqlGeo — geometry round-trip

  • Input: generate a polygon via go-geom + wkb.Marshal → plain WKB
  • Forward: run wkbToEwkbWithSrid → EWKB
  • Inverse: run ewkbToMssqlGeo(ewkb, false) → CLR/UDT bytes
  • Assert: no error, output is non-empty []byte

TC-4: ewkbToMssqlGeo — nil input

  • Input: nil
  • Assert: returns nil, nil (no panic)

TC-5: ewkbToMssqlGeo — EWKB without SRID flag

  • Input: plain WKB (no SRID flag set)
  • Assert: function still calls WkbToUdtGeo and returns without error

TC-6: Transformer factory selection

  • Given SourceDbType == "postgres"NewPostgresTransformer is selected
  • Given SourceDbType == "sqlserver"NewMssqlTransformer is selected

Files changed (summary)

  1. cmd/go_migrate/process.go — transformer factory branch
  2. cmd/go_migrate/main.go — analyzer selection branch
  3. internal/app/etl/transformers/utils.go — 2 new functions
  4. internal/app/etl/transformers/plan.gocomputePostgresTransformationPlan
  5. internal/app/etl/transformers/postgres.go (new)
  6. internal/app/etl/table_analyzers/postgres.go — 3 stub implementations