150 lines
2.8 KiB
Go
150 lines
2.8 KiB
Go
package transformer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
)
|
|
|
|
type MssqlTransformer struct{}
|
|
|
|
func NewMssqlTransformer() Transformer {
|
|
return &MssqlTransformer{}
|
|
}
|
|
|
|
func computeTransformationPlan(columns []models.ColumnType) []columnTransformPlan {
|
|
var plan []columnTransformPlan
|
|
|
|
for i, col := range columns {
|
|
switch col.SystemType() {
|
|
case "uniqueidentifier":
|
|
plan = append(plan, columnTransformPlan{
|
|
index: i,
|
|
fn: func(v any) (any, error) {
|
|
if b, ok := v.([]byte); ok && b != nil {
|
|
return mssqlUuidToBigEndian(b)
|
|
}
|
|
return v, nil
|
|
},
|
|
})
|
|
|
|
case "geometry", "geography":
|
|
plan = append(plan, columnTransformPlan{
|
|
index: i,
|
|
fn: func(v any) (any, error) {
|
|
if b, ok := v.([]byte); ok && b != nil {
|
|
return wkbToEwkbWithSrid(b, 4326)
|
|
}
|
|
return v, nil
|
|
},
|
|
})
|
|
|
|
case "datetime", "datetime2":
|
|
plan = append(plan, columnTransformPlan{
|
|
index: i,
|
|
fn: func(v any) (any, error) {
|
|
if t, ok := v.(time.Time); ok {
|
|
return ensureUTC(t), nil
|
|
}
|
|
return v, nil
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
return plan
|
|
}
|
|
|
|
const processChunkCtxCheck = 4096
|
|
|
|
func (mssqlTr *MssqlTransformer) ProcessChunk(
|
|
ctx context.Context,
|
|
chunk *models.Chunk,
|
|
transformationPlan []columnTransformPlan,
|
|
) error {
|
|
for i, rowValues := range chunk.Data {
|
|
if i%processChunkCtxCheck == 0 {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, task := range transformationPlan {
|
|
val := rowValues[task.index]
|
|
if val == nil {
|
|
continue
|
|
}
|
|
|
|
transformed, err := task.fn(val)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rowValues[task.index] = transformed
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mssqlTr *MssqlTransformer) Exec(
|
|
ctx context.Context,
|
|
columns []models.ColumnType,
|
|
chChunksIn <-chan models.Chunk,
|
|
chChunksOut chan<- models.Chunk,
|
|
chJobErrorsOut chan<- custom_errors.JobError,
|
|
wgActiveChunks *sync.WaitGroup,
|
|
) {
|
|
transformationPlan := computeTransformationPlan(columns)
|
|
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case chunk, ok := <-chChunksIn:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if len(transformationPlan) == 0 {
|
|
select {
|
|
case chChunksOut <- chunk:
|
|
wgActiveChunks.Add(1)
|
|
continue
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
err := mssqlTr.ProcessChunk(ctx, &chunk, transformationPlan)
|
|
if err != nil {
|
|
if errors.Is(err, ctx.Err()) {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
|
|
select {
|
|
case chChunksOut <- chunk:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
wgActiveChunks.Add(1)
|
|
}
|
|
}
|
|
}
|