package transformers import ( "context" "fmt" "strings" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" "github.com/sirupsen/logrus" ) func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { var plan []etl.ColumnTransformPlan for i, col := range columns { switch col.SystemType() { case "uniqueidentifier": plan = append(plan, etl.ColumnTransformPlan{ Index: i, Fn: func(v any) (any, error) { if b, ok := v.([]byte); ok && b != nil { return mssqlUuidToBigEndian(b) } return v, nil }, }) case "geometry", "geography": plan = append(plan, etl.ColumnTransformPlan{ Index: i, Fn: func(v any) (any, error) { if b, ok := v.([]byte); ok && b != nil { return wkbToEwkbWithSrid(b, 4326) } return v, nil }, }) case "datetime", "datetime2": plan = append(plan, etl.ColumnTransformPlan{ Index: i, Fn: func(v any) (any, error) { if t, ok := v.(time.Time); ok { return ensureUTC(t), nil } return v, nil }, }) } } return plan } func computeStorageTransformationPlan( ctx context.Context, azureClient *azure.Client, toStorage config.ToStorageConfig, sourceColumns []models.ColumnType, sourceTable config.SourceTableInfo, ) []etl.ColumnTransformPlan { if azureClient == nil || len(toStorage.Columns) == 0 { return nil } colIndex := make(map[string]int, len(sourceColumns)) for i, col := range sourceColumns { colIndex[strings.ToUpper(col.Name())] = i } var plan []etl.ColumnTransformPlan for _, storageCol := range toStorage.Columns { if storageCol.Mode != "REFERENCE_ONLY" { logrus.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source) continue } idx, ok := colIndex[strings.ToUpper(storageCol.Source)] if !ok { logrus.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source) continue } sourceColName := storageCol.Source schema := sourceTable.Schema table := sourceTable.Table plan = append(plan, etl.ColumnTransformPlan{ Index: idx, Fn: func(v any) (any, error) { if v == nil { return nil, nil } b, ok := v.([]byte) if !ok { logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through", schema, table, sourceColName, v) return v, nil } // start := time.Now() blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String()) blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b) if err != nil { return nil, &custom_errors.JobError{ Msg: fmt.Sprintf("Error uploading %s.%s.%s", schema, table, sourceColName), Prev: err, } } // logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds()) return blobURL, nil }, }) } return plan }