feat: add MSSQL extractor and transformer implementations for improved data migration

This commit is contained in:
2026-04-10 19:59:44 -05:00
parent eb3c3bbfce
commit 053e6bd673
5 changed files with 114 additions and 74 deletions

View File

@@ -1,80 +0,0 @@
package main
import (
"encoding/binary"
"errors"
"time"
)
func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
if len(mssqlUuid) != 16 {
return nil, errors.New("Invalid uuid")
}
pgUuid := make([]byte, 16)
pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0]
pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4]
pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6]
copy(pgUuid[8:], mssqlUuid[8:])
return pgUuid, nil
}
const sridFlag = 0x20000000
func wkbToEwkbWithSrid(geometry []byte, srid int) ([]byte, error) {
if len(geometry) < 5 {
return nil, errors.New("Invalid wkb")
}
var byteOrder binary.ByteOrder
if geometry[0] == 0 {
byteOrder = binary.BigEndian
} else {
byteOrder = binary.LittleEndian
}
wkbType := byteOrder.Uint32(geometry[1:5])
if wkbType&sridFlag != 0 {
return geometry, nil
}
ewkbType := wkbType | sridFlag
result := make([]byte, len(geometry)+4)
result[0] = geometry[0]
byteOrder.PutUint32(result[1:5], ewkbType)
byteOrder.PutUint32(result[5:9], uint32(srid))
copy(result[9:], geometry[5:])
return result, nil
}
func ensureUTC(t time.Time) time.Time {
if t.Location() == time.UTC {
return t
}
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
}
}

View File

@@ -10,6 +10,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformer"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
@@ -60,6 +61,9 @@ func processMigrationJob(
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
mssqlExtractor := extractor.NewMssqlExtractor(sourceDb)
mssqlToPostgresTransformer := transformer.NewMssqlTransformer()
go func() {
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
cancel()
@@ -73,11 +77,9 @@ func processMigrationJob(
maxExtractors := min(job.MaxExtractors, len(batches))
log.Infof("Starting %d extractor(s)...", maxExtractors)
exMssql := extractor.NewMssqlExtractor(sourceDb)
for range maxExtractors {
wgExtractors.Go(func() {
exMssql.Exec(
mssqlExtractor.Exec(
jobCtx,
job.SourceTable,
sourceColTypes,
@@ -103,7 +105,14 @@ func processMigrationJob(
for range maxExtractors {
wgTransformers.Go(func() {
transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks)
mssqlToPostgresTransformer.Exec(
jobCtx,
sourceColTypes,
chChunksRaw,
chChunksTransformed,
chJobErrors,
&wgActiveChunks,
)
})
}

View File

@@ -1,151 +0,0 @@
package main
import (
"context"
"errors"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus"
)
type transformerFunc func(any) (any, error)
type columnTransformPlan struct {
index int
fn transformerFunc
}
func transformRowsMssql(
ctx context.Context,
columns []models.ColumnType,
chChunksIn <-chan models.Chunk,
chChunksOut chan<- models.Chunk,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case chunk, ok := <-chChunksIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chChunksOut <- chunk:
wgActiveChunks.Add(1)
continue
case <-ctx.Done():
return
}
}
chunkStartTime := time.Now()
err := processChunk(ctx, &chunk, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
log.Infof("Transformed chunk %s: %d rows in %v", chunk.Id, len(chunk.Data), time.Since(chunkStartTime))
select {
case chChunksOut <- chunk:
case <-ctx.Done():
return
}
wgActiveChunks.Add(1)
}
}
}
func computeTransformationPlan(columns []models.ColumnType) []columnTransformPlan {
var plan []columnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
const processChunkCtxCheck = 4096
func processChunk(ctx context.Context, chunk *models.Chunk, transformationPlan []columnTransformPlan) error {
for i, rowValues := range chunk.Data {
if i%processChunkCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
for _, task := range transformationPlan {
val := rowValues[task.index]
if val == nil {
continue
}
transformed, err := task.fn(val)
if err != nil {
return err
}
rowValues[task.index] = transformed
}
}
return nil
}