refactor: replace Exec with Consume method in MssqlTransformer; enhance retry handling and streamline transformation logic
This commit is contained in:
@@ -1,20 +1,9 @@
|
||||
package transformers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type MssqlTransformer struct {
|
||||
@@ -30,196 +19,3 @@ func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.So
|
||||
azureClient: azureClient,
|
||||
}
|
||||
}
|
||||
|
||||
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
||||
var plan []etl.ColumnTransformPlan
|
||||
|
||||
for i, col := range columns {
|
||||
switch col.SystemType() {
|
||||
case "uniqueidentifier":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return mssqlUuidToBigEndian(b)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "geometry", "geography":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return wkbToEwkbWithSrid(b, 4326)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "datetime", "datetime2":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if t, ok := v.(time.Time); ok {
|
||||
return ensureUTC(t), nil
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return plan
|
||||
}
|
||||
|
||||
func computeStorageTransformationPlan(
|
||||
ctx context.Context,
|
||||
azureClient *azure.Client,
|
||||
toStorage config.ToStorageConfig,
|
||||
sourceColumns []models.ColumnType,
|
||||
sourceTable config.SourceTableInfo,
|
||||
) []etl.ColumnTransformPlan {
|
||||
if azureClient == nil || len(toStorage.Columns) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
colIndex := make(map[string]int, len(sourceColumns))
|
||||
for i, col := range sourceColumns {
|
||||
colIndex[strings.ToUpper(col.Name())] = i
|
||||
}
|
||||
|
||||
var plan []etl.ColumnTransformPlan
|
||||
for _, storageCol := range toStorage.Columns {
|
||||
if storageCol.Mode != "REFERENCE_ONLY" {
|
||||
log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
|
||||
if !ok {
|
||||
log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
sourceColName := storageCol.Source
|
||||
schema := sourceTable.Schema
|
||||
table := sourceTable.Table
|
||||
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: idx,
|
||||
Fn: func(v any) (any, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
b, ok := v.([]byte)
|
||||
if !ok {
|
||||
log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
|
||||
schema, table, sourceColName, v)
|
||||
return v, nil
|
||||
}
|
||||
start := time.Now()
|
||||
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
|
||||
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err)
|
||||
}
|
||||
log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
|
||||
return blobURL, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
return plan
|
||||
}
|
||||
|
||||
const processBatchCtxCheck = 4096
|
||||
|
||||
func (mssqlTr *MssqlTransformer) ProcessBatch(
|
||||
ctx context.Context,
|
||||
batch *models.Batch,
|
||||
transformationPlan []etl.ColumnTransformPlan,
|
||||
) error {
|
||||
for i, rowValues := range batch.Rows {
|
||||
if i%processBatchCtxCheck == 0 {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, task := range transformationPlan {
|
||||
val := rowValues[task.Index]
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
transformed, err := task.Fn(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rowValues[task.Index] = transformed
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mssqlTr *MssqlTransformer) Exec(
|
||||
ctx context.Context,
|
||||
columns []models.ColumnType,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
) {
|
||||
transformationPlan := computeTransformationPlan(columns)
|
||||
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
|
||||
transformationPlan = append(transformationPlan, storagePlan...)
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if len(transformationPlan) == 0 {
|
||||
select {
|
||||
case chBatchesOut <- batch:
|
||||
wgActiveBatches.Add(1)
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
|
||||
if err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case chBatchesOut <- batch:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
wgActiveBatches.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user