Files

226 lines
5.2 KiB
Go

package transformers
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
type MssqlTransformer struct {
toStorage config.ToStorageConfig
sourceTable config.SourceTableInfo
azureClient *azure.Client
}
func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.SourceTableInfo, azureClient *azure.Client) etl.Transformer {
return &MssqlTransformer{
toStorage: toStorage,
sourceTable: sourceTable,
azureClient: azureClient,
}
}
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
func computeStorageTransformationPlan(
ctx context.Context,
azureClient *azure.Client,
toStorage config.ToStorageConfig,
sourceColumns []models.ColumnType,
sourceTable config.SourceTableInfo,
) []etl.ColumnTransformPlan {
if azureClient == nil || len(toStorage.Columns) == 0 {
return nil
}
colIndex := make(map[string]int, len(sourceColumns))
for i, col := range sourceColumns {
colIndex[strings.ToUpper(col.Name())] = i
}
var plan []etl.ColumnTransformPlan
for _, storageCol := range toStorage.Columns {
if storageCol.Mode != "REFERENCE_ONLY" {
log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
continue
}
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
if !ok {
log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
continue
}
sourceColName := storageCol.Source
schema := sourceTable.Schema
table := sourceTable.Table
plan = append(plan, etl.ColumnTransformPlan{
Index: idx,
Fn: func(v any) (any, error) {
if v == nil {
return nil, nil
}
b, ok := v.([]byte)
if !ok {
log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
schema, table, sourceColName, v)
return v, nil
}
start := time.Now()
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
if err != nil {
return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err)
}
log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
return blobURL, nil
},
})
}
return plan
}
const processBatchCtxCheck = 4096
func (mssqlTr *MssqlTransformer) ProcessBatch(
ctx context.Context,
batch *models.Batch,
transformationPlan []etl.ColumnTransformPlan,
) error {
for i, rowValues := range batch.Rows {
if i%processBatchCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
for _, task := range transformationPlan {
val := rowValues[task.Index]
if val == nil {
continue
}
transformed, err := task.Fn(val)
if err != nil {
return err
}
rowValues[task.Index] = transformed
}
}
return nil
}
func (mssqlTr *MssqlTransformer) Exec(
ctx context.Context,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...)
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chBatchesOut <- batch:
wgActiveBatches.Add(1)
continue
case <-ctx.Done():
return
}
}
err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
select {
case chBatchesOut <- batch:
case <-ctx.Done():
return
}
wgActiveBatches.Add(1)
}
}
}