package transformers import ( "context" "errors" "fmt" "strings" "sync" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" log "github.com/sirupsen/logrus" ) type MssqlTransformer struct { toStorage config.ToStorageConfig sourceTable config.SourceTableInfo azureClient *azure.Client } func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.SourceTableInfo, azureClient *azure.Client) etl.Transformer { return &MssqlTransformer{ toStorage: toStorage, sourceTable: sourceTable, azureClient: azureClient, } } func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { var plan []etl.ColumnTransformPlan for i, col := range columns { switch col.SystemType() { case "uniqueidentifier": plan = append(plan, etl.ColumnTransformPlan{ Index: i, Fn: func(v any) (any, error) { if b, ok := v.([]byte); ok && b != nil { return mssqlUuidToBigEndian(b) } return v, nil }, }) case "geometry", "geography": plan = append(plan, etl.ColumnTransformPlan{ Index: i, Fn: func(v any) (any, error) { if b, ok := v.([]byte); ok && b != nil { return wkbToEwkbWithSrid(b, 4326) } return v, nil }, }) case "datetime", "datetime2": plan = append(plan, etl.ColumnTransformPlan{ Index: i, Fn: func(v any) (any, error) { if t, ok := v.(time.Time); ok { return ensureUTC(t), nil } return v, nil }, }) } } return plan } func computeStorageTransformationPlan( ctx context.Context, azureClient *azure.Client, toStorage config.ToStorageConfig, sourceColumns []models.ColumnType, sourceTable config.SourceTableInfo, ) []etl.ColumnTransformPlan { if azureClient == nil || len(toStorage.Columns) == 0 { return nil } colIndex := make(map[string]int, len(sourceColumns)) for i, col := range sourceColumns { colIndex[strings.ToUpper(col.Name())] = i } var plan []etl.ColumnTransformPlan for _, storageCol := range toStorage.Columns { if storageCol.Mode != "REFERENCE_ONLY" { log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source) continue } idx, ok := colIndex[strings.ToUpper(storageCol.Source)] if !ok { log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source) continue } sourceColName := storageCol.Source schema := sourceTable.Schema table := sourceTable.Table plan = append(plan, etl.ColumnTransformPlan{ Index: idx, Fn: func(v any) (any, error) { if v == nil { return nil, nil } b, ok := v.([]byte) if !ok { log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through", schema, table, sourceColName, v) return v, nil } start := time.Now() blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String()) blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b) if err != nil { return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err) } log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds()) return blobURL, nil }, }) } return plan } const processBatchCtxCheck = 4096 func (mssqlTr *MssqlTransformer) ProcessBatch( ctx context.Context, batch *models.Batch, transformationPlan []etl.ColumnTransformPlan, ) error { for i, rowValues := range batch.Rows { if i%processBatchCtxCheck == 0 { if err := ctx.Err(); err != nil { return err } } for _, task := range transformationPlan { val := rowValues[task.Index] if val == nil { continue } transformed, err := task.Fn(val) if err != nil { return err } rowValues[task.Index] = transformed } } return nil } func (mssqlTr *MssqlTransformer) Exec( ctx context.Context, columns []models.ColumnType, chBatchesIn <-chan models.Batch, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, ) { transformationPlan := computeTransformationPlan(columns) storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable) transformationPlan = append(transformationPlan, storagePlan...) for { if ctx.Err() != nil { return } select { case <-ctx.Done(): return case batch, ok := <-chBatchesIn: if !ok { return } if len(transformationPlan) == 0 { select { case chBatchesOut <- batch: wgActiveBatches.Add(1) continue case <-ctx.Done(): return } } err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan) if err != nil { if errors.Is(err, ctx.Err()) { return } select { case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: case <-ctx.Done(): } return } select { case chBatchesOut <- batch: case <-ctx.Done(): return } wgActiveBatches.Add(1) } } }