feat: update chunk size for MSSQL processing and enhance error handling in transformation functions

This commit is contained in:
2026-04-08 20:48:36 -05:00
parent eeef3bc813
commit 853be4a5a6
4 changed files with 69 additions and 40 deletions

View File

@@ -28,7 +28,7 @@ var migrationJobs []MigrationJob = []MigrationJob{
const ( const (
NumExtractors int = 4 NumExtractors int = 4
NumLoaders int = 8 NumLoaders int = 8
ChunkSize int = 25000 ChunkSize int = 50000
QueueSize int = 8 QueueSize int = 8
ChunksPerBatch int = 16 ChunksPerBatch int = 16
RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch) RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch)

View File

@@ -2,27 +2,29 @@ package main
import ( import (
"encoding/binary" "encoding/binary"
"errors"
"time" "time"
) )
func mssqlUuidToBigEndian(mssqlUuid []byte) []byte { func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
if len(mssqlUuid) != 16 { if len(mssqlUuid) != 16 {
return mssqlUuid return nil, errors.New("Invalid uuid")
} }
pgUuid := make([]byte, 16) pgUuid := make([]byte, 16)
pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0] pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0]
pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4] pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4]
pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6] pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6]
copy(pgUuid[8:], mssqlUuid[8:]) copy(pgUuid[8:], mssqlUuid[8:])
return pgUuid return pgUuid, nil
} }
const sridFlag = 0x20000000 const sridFlag = 0x20000000
func wkbToEwkbWithSrid(geometry []byte, srid int) []byte { func wkbToEwkbWithSrid(geometry []byte, srid int) ([]byte, error) {
if len(geometry) < 5 { if len(geometry) < 5 {
return geometry return nil, errors.New("Invalid wkb")
} }
var byteOrder binary.ByteOrder var byteOrder binary.ByteOrder
@@ -34,7 +36,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
wkbType := byteOrder.Uint32(geometry[1:5]) wkbType := byteOrder.Uint32(geometry[1:5])
if wkbType&sridFlag != 0 { if wkbType&sridFlag != 0 {
return geometry return geometry, nil
} }
ewkbType := wkbType | sridFlag ewkbType := wkbType | sridFlag
@@ -49,7 +51,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
copy(result[9:], geometry[5:]) copy(result[9:], geometry[5:])
return result return result, nil
} }
func ensureUTC(t time.Time) time.Time { func ensureUTC(t time.Time) time.Time {
@@ -59,3 +61,20 @@ func ensureUTC(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
} }
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
}
}

View File

@@ -74,20 +74,20 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
log.Infof("Extraction completed in %v", time.Since(extractStartTime)) log.Infof("Extraction completed in %v", time.Since(extractStartTime))
}() }()
chRowsTransform := make(chan []UnknownRowValues, QueueSize) chChunksTransform := make(chan []UnknownRowValues, QueueSize)
var wgMssqlTransformers sync.WaitGroup var wgMssqlTransformers sync.WaitGroup
log.Infof("Starting %d MSSQL transformers...", maxExtractors) log.Infof("Starting %d MSSQL transformers...", maxExtractors)
transformStartTime := time.Now() transformStartTime := time.Now()
for range maxExtractors { for range maxExtractors {
wgMssqlTransformers.Go(func() { wgMssqlTransformers.Go(func() {
transformRowsMssql(sourceColTypes, chChunks, chRowsTransform) transformRowsMssql(sourceColTypes, chChunks, chChunksTransform, chJobErrors)
}) })
} }
go func() { go func() {
wgMssqlTransformers.Wait() wgMssqlTransformers.Wait()
close(chRowsTransform) close(chChunksTransform)
log.Infof("Transformation completed in %v", time.Since(transformStartTime)) log.Infof("Transformation completed in %v", time.Since(transformStartTime))
}() }()
@@ -98,7 +98,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
for range NumLoaders { for range NumLoaders {
wgPostgresLoaders.Go(func() { wgPostgresLoaders.Go(func() {
if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chRowsTransform); err != nil { if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chChunksTransform); err != nil {
log.Error("Unexpected error loading data into postgres: ", err) log.Error("Unexpected error loading data into postgres: ", err)
} }
}) })

View File

@@ -6,26 +6,53 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) { func transformRowsMssql(
columns []ColumnType,
chChunksIn <-chan []UnknownRowValues,
chChunksOut chan<- []UnknownRowValues,
chJobErrorsOut chan<- JobError,
) {
chunkCount := 0 chunkCount := 0
totalRowsTransformed := 0 totalRowsTransformed := 0
for rows := range in { for rows := range chChunksIn {
chunkStartTime := time.Now() chunkStartTime := time.Now()
log.Debugf("Chunk #%d received, transforming %d rows...", chunkCount+1, len(rows)) log.Debugf("Chunk received, transforming %d rows...", len(rows))
for _, rowValues := range rows { for _, rowValues := range rows {
for i, col := range columns { for i, col := range columns {
value := rowValues[i] value := rowValues[i]
if col.SystemType() == "uniqueidentifier" {
switch col.SystemType() {
case "uniqueidentifier":
if b, ok := value.([]byte); ok { if b, ok := value.([]byte); ok {
rowValues[i] = mssqlUuidToBigEndian(b) pgUuid, err := mssqlUuidToBigEndian(b)
if err != nil {
jobError := JobError{
ShouldCancelJob: true,
Prev: err,
}
chJobErrorsOut <- jobError
return
}
rowValues[i] = pgUuid
} }
} else if col.SystemType() == "geometry" || col.SystemType() == "geography" {
case "geometry", "geography":
if b, ok := value.([]byte); ok { if b, ok := value.([]byte); ok {
rowValues[i] = wkbToEwkbWithSrid(b, 4326) ewkb, err := wkbToEwkbWithSrid(b, 4326)
if err != nil {
jobError := JobError{
ShouldCancelJob: true,
Prev: err,
}
chJobErrorsOut <- jobError
return
}
rowValues[i] = ewkb
} }
} else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" {
case "datetime", "datetime2":
if t, ok := value.(time.Time); ok { if t, ok := value.(time.Time); ok {
rowValues[i] = ensureUTC(t) rowValues[i] = ensureUTC(t)
} }
@@ -37,26 +64,9 @@ func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out
totalRowsTransformed += len(rows) totalRowsTransformed += len(rows)
chunkDuration := time.Since(chunkStartTime) chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
log.Infof("Transformed chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows", log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
chunkCount, len(rows), chunkDuration, rowsPerSec, totalRowsTransformed) len(rows), chunkDuration, rowsPerSec, totalRowsTransformed)
out <- rows chChunksOut <- rows
}
}
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
} }
} }