From 853be4a5a6453dd203056e3e676566e64af9c428 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Wed, 8 Apr 2026 20:48:36 -0500 Subject: [PATCH] feat: update chunk size for MSSQL processing and enhance error handling in transformation functions --- cmd/go_migrate/main.go | 2 +- cmd/go_migrate/mssql-transform.go | 33 ++++++++++++---- cmd/go_migrate/process.go | 8 ++-- cmd/go_migrate/transformer.go | 66 ++++++++++++++++++------------- 4 files changed, 69 insertions(+), 40 deletions(-) diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index a33b1dd..49a7ec5 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -28,7 +28,7 @@ var migrationJobs []MigrationJob = []MigrationJob{ const ( NumExtractors int = 4 NumLoaders int = 8 - ChunkSize int = 25000 + ChunkSize int = 50000 QueueSize int = 8 ChunksPerBatch int = 16 RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch) diff --git a/cmd/go_migrate/mssql-transform.go b/cmd/go_migrate/mssql-transform.go index d31195b..a5ae098 100644 --- a/cmd/go_migrate/mssql-transform.go +++ b/cmd/go_migrate/mssql-transform.go @@ -2,27 +2,29 @@ package main import ( "encoding/binary" + "errors" "time" ) -func mssqlUuidToBigEndian(mssqlUuid []byte) []byte { +func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) { if len(mssqlUuid) != 16 { - return mssqlUuid + return nil, errors.New("Invalid uuid") } + pgUuid := make([]byte, 16) pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0] pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4] pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6] copy(pgUuid[8:], mssqlUuid[8:]) - return pgUuid + return pgUuid, nil } const sridFlag = 0x20000000 -func wkbToEwkbWithSrid(geometry []byte, srid int) []byte { +func wkbToEwkbWithSrid(geometry []byte, srid int) ([]byte, error) { if len(geometry) < 5 { - return geometry + return nil, errors.New("Invalid wkb") } var byteOrder binary.ByteOrder @@ -34,7 +36,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte { wkbType := byteOrder.Uint32(geometry[1:5]) if wkbType&sridFlag != 0 { - return geometry + return geometry, nil } ewkbType := wkbType | sridFlag @@ -49,7 +51,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte { copy(result[9:], geometry[5:]) - return result + return result, nil } func ensureUTC(t time.Time) time.Time { @@ -59,3 +61,20 @@ func ensureUTC(t time.Time) time.Time { return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) } + +func ToInt64(v any) (int64, bool) { + switch t := v.(type) { + case int: + return int64(t), true + case int8: + return int64(t), true + case int16: + return int64(t), true + case int32: + return int64(t), true + case int64: + return int64(t), true + default: + return 0, false + } +} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index c2d2311..4589dac 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -74,20 +74,20 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration log.Infof("Extraction completed in %v", time.Since(extractStartTime)) }() - chRowsTransform := make(chan []UnknownRowValues, QueueSize) + chChunksTransform := make(chan []UnknownRowValues, QueueSize) var wgMssqlTransformers sync.WaitGroup log.Infof("Starting %d MSSQL transformers...", maxExtractors) transformStartTime := time.Now() for range maxExtractors { wgMssqlTransformers.Go(func() { - transformRowsMssql(sourceColTypes, chChunks, chRowsTransform) + transformRowsMssql(sourceColTypes, chChunks, chChunksTransform, chJobErrors) }) } go func() { wgMssqlTransformers.Wait() - close(chRowsTransform) + close(chChunksTransform) log.Infof("Transformation completed in %v", time.Since(transformStartTime)) }() @@ -98,7 +98,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration for range NumLoaders { wgPostgresLoaders.Go(func() { - if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chRowsTransform); err != nil { + if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chChunksTransform); err != nil { log.Error("Unexpected error loading data into postgres: ", err) } }) diff --git a/cmd/go_migrate/transformer.go b/cmd/go_migrate/transformer.go index ceb56d0..21fa033 100644 --- a/cmd/go_migrate/transformer.go +++ b/cmd/go_migrate/transformer.go @@ -6,26 +6,53 @@ import ( log "github.com/sirupsen/logrus" ) -func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) { +func transformRowsMssql( + columns []ColumnType, + chChunksIn <-chan []UnknownRowValues, + chChunksOut chan<- []UnknownRowValues, + chJobErrorsOut chan<- JobError, +) { chunkCount := 0 totalRowsTransformed := 0 - for rows := range in { + for rows := range chChunksIn { chunkStartTime := time.Now() - log.Debugf("Chunk #%d received, transforming %d rows...", chunkCount+1, len(rows)) + log.Debugf("Chunk received, transforming %d rows...", len(rows)) for _, rowValues := range rows { for i, col := range columns { value := rowValues[i] - if col.SystemType() == "uniqueidentifier" { + + switch col.SystemType() { + case "uniqueidentifier": if b, ok := value.([]byte); ok { - rowValues[i] = mssqlUuidToBigEndian(b) + pgUuid, err := mssqlUuidToBigEndian(b) + if err != nil { + jobError := JobError{ + ShouldCancelJob: true, + Prev: err, + } + chJobErrorsOut <- jobError + return + } + rowValues[i] = pgUuid } - } else if col.SystemType() == "geometry" || col.SystemType() == "geography" { + + case "geometry", "geography": if b, ok := value.([]byte); ok { - rowValues[i] = wkbToEwkbWithSrid(b, 4326) + ewkb, err := wkbToEwkbWithSrid(b, 4326) + if err != nil { + jobError := JobError{ + ShouldCancelJob: true, + Prev: err, + } + chJobErrorsOut <- jobError + return + } + rowValues[i] = ewkb } - } else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" { + + case "datetime", "datetime2": if t, ok := value.(time.Time); ok { rowValues[i] = ensureUTC(t) } @@ -37,26 +64,9 @@ func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out totalRowsTransformed += len(rows) chunkDuration := time.Since(chunkStartTime) rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() - log.Infof("Transformed chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows", - chunkCount, len(rows), chunkDuration, rowsPerSec, totalRowsTransformed) + log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", + len(rows), chunkDuration, rowsPerSec, totalRowsTransformed) - out <- rows - } -} - -func ToInt64(v any) (int64, bool) { - switch t := v.(type) { - case int: - return int64(t), true - case int8: - return int64(t), true - case int16: - return int64(t), true - case int32: - return int64(t), true - case int64: - return int64(t), true - default: - return 0, false + chChunksOut <- rows } }