feat: refactor transformation logic in MSSQL processing to use context and improve error handling
This commit is contained in:
@@ -81,7 +81,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
|
||||
transformStartTime := time.Now()
|
||||
for range maxExtractors {
|
||||
wgMssqlTransformers.Go(func() {
|
||||
transformRowsMssql(sourceColTypes, chChunks, chChunksTransform, chJobErrors)
|
||||
transformRowsMssql(ctx, sourceColTypes, chChunks, chChunksTransform, chJobErrors)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,72 +1,144 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type transformerFunc func(any) (any, error)
|
||||
|
||||
type columnTransformPlan struct {
|
||||
index int
|
||||
fn transformerFunc
|
||||
}
|
||||
|
||||
func transformRowsMssql(
|
||||
ctx context.Context,
|
||||
columns []ColumnType,
|
||||
chChunksIn <-chan Chunk,
|
||||
chChunksOut chan<- Chunk,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
) {
|
||||
chunkCount := 0
|
||||
totalRowsTransformed := 0
|
||||
transformationPlan := computeTransformationPlan(columns)
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case chunk, ok := <-chChunksIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if len(transformationPlan) == 0 {
|
||||
select {
|
||||
case chChunksOut <- chunk:
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for chunk := range chChunksIn {
|
||||
chunkStartTime := time.Now()
|
||||
log.Debugf("Chunk received, transforming %d rows...", len(chunk.Data))
|
||||
|
||||
for _, rowValues := range chunk.Data {
|
||||
for i, col := range columns {
|
||||
value := rowValues[i]
|
||||
|
||||
switch col.SystemType() {
|
||||
case "uniqueidentifier":
|
||||
if b, ok := value.([]byte); ok {
|
||||
pgUuid, err := mssqlUuidToBigEndian(b)
|
||||
err := processChunk(ctx, &chunk, transformationPlan)
|
||||
if err != nil {
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: true,
|
||||
Prev: err,
|
||||
}
|
||||
chJobErrorsOut <- jobError
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return
|
||||
}
|
||||
rowValues[i] = pgUuid
|
||||
}
|
||||
|
||||
case "geometry", "geography":
|
||||
if b, ok := value.([]byte); ok {
|
||||
ewkb, err := wkbToEwkbWithSrid(b, 4326)
|
||||
if err != nil {
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: true,
|
||||
Prev: err,
|
||||
select {
|
||||
case chJobErrorsOut <- JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
chJobErrorsOut <- jobError
|
||||
return
|
||||
}
|
||||
rowValues[i] = ewkb
|
||||
}
|
||||
|
||||
case "datetime", "datetime2":
|
||||
if t, ok := value.(time.Time); ok {
|
||||
rowValues[i] = ensureUTC(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Infof("Transformed chunk %s: %d rows in %v", chunk.Id, len(chunk.Data), time.Since(chunkStartTime))
|
||||
|
||||
chunkCount++
|
||||
totalRowsTransformed += len(chunk.Data)
|
||||
chunkDuration := time.Since(chunkStartTime)
|
||||
rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds()
|
||||
log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
|
||||
len(chunk.Data), chunkDuration, rowsPerSec, totalRowsTransformed)
|
||||
|
||||
chChunksOut <- chunk
|
||||
select {
|
||||
case chChunksOut <- chunk:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func computeTransformationPlan(columns []ColumnType) []columnTransformPlan {
|
||||
var plan []columnTransformPlan
|
||||
|
||||
for i, col := range columns {
|
||||
switch col.SystemType() {
|
||||
case "uniqueidentifier":
|
||||
plan = append(plan, columnTransformPlan{
|
||||
index: i,
|
||||
fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return mssqlUuidToBigEndian(b)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "geometry", "geography":
|
||||
plan = append(plan, columnTransformPlan{
|
||||
index: i,
|
||||
fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return wkbToEwkbWithSrid(b, 4326)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "datetime", "datetime2":
|
||||
plan = append(plan, columnTransformPlan{
|
||||
index: i,
|
||||
fn: func(v any) (any, error) {
|
||||
if t, ok := v.(time.Time); ok {
|
||||
return ensureUTC(t), nil
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return plan
|
||||
}
|
||||
|
||||
const processChunkCtxCheck = 4096
|
||||
|
||||
func processChunk(ctx context.Context, chunk *Chunk, transformationPlan []columnTransformPlan) error {
|
||||
for i, rowValues := range chunk.Data {
|
||||
if i%processChunkCtxCheck == 0 {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, task := range transformationPlan {
|
||||
val := rowValues[task.index]
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
transformed, err := task.fn(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rowValues[task.index] = transformed
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user