diff --git a/.atl/skill-registry.md b/.atl/skill-registry.md new file mode 100644 index 0000000..2f91391 --- /dev/null +++ b/.atl/skill-registry.md @@ -0,0 +1,24 @@ +# Skill Registry — go-migrate + +Generated: 2026-04-21 + +## Compact Rules + +### Go conventions +- Use existing error wrapping pattern: `fmt.Errorf("context: %w", err)` +- Channel-based pipeline — keep goroutine lifecycle clean (close channels in correct order) +- No comments unless non-obvious WHY; no docstrings +- Prefer named returns only when it aids clarity in short functions +- Use `strings.EqualFold` for case-insensitive column name comparison + +### Project conventions +- Config structs live in `internal/app/config/` +- ETL interfaces live in `internal/app/etl/types.go` +- Transformer implementations in `internal/app/etl/transformers/` +- Azure operations via `internal/app/azure/main.go` +- Per-job transformer creation (not shared) when job has storage config + +## User Skills +| Trigger | Skill | +|---------|-------| +| sdd-* | SDD workflow skills | diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 3e289af..9ed7816 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -5,12 +5,12 @@ import ( "sync" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" + dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -119,9 +119,17 @@ func processMigrationJobs( sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) extractor := extractors.NewMssqlExtractor(sourceDb) - transformer := transformers.NewMssqlTransformer() loader := loaders.NewGenericLoader(targetDb) + var azureClient *azure.Client + if config.App.AzureStorage.Enabled { + var err error + azureClient, err = azure.NewClient(config.App.AzureStorage) + if err != nil { + log.Fatalf("Failed to create Azure storage client: %v", err) + } + } + for i := range maxParallelWorkers { wgJobs.Go(func() { for job := range chJobs { @@ -132,7 +140,7 @@ func processMigrationJobs( sourceTableAnalyzer, targetTableAnalyzer, extractor, - transformer, + azureClient, loader, job, targetDb.GetDialect(), diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 2b80571..b87a1f7 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -7,12 +7,14 @@ import ( "sync/atomic" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -38,11 +40,12 @@ func processMigrationJob( sourceTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer, extractor etl.Extractor, - transformer etl.Transformer, + azureClient *azure.Client, loader etl.Loader, job config.Job, targetDbType string, ) models.JobResult { + transformer := transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient) localCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/internal/app/azure/main.go b/internal/app/azure/main.go index e45bb62..7c08bef 100644 --- a/internal/app/azure/main.go +++ b/internal/app/azure/main.go @@ -66,3 +66,22 @@ func (c *Client) UploadBuffer(ctx context.Context, containerName, blobPath strin } return nil } + +func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []byte) (string, error) { + if blobPath == "" || buffer == nil { + return "", ErrInvalidInput + } + + fullPath := blobPath + if c.azureStorageConfig.Prefix != "" { + fullPath, _ = url.JoinPath(c.azureStorageConfig.Prefix, blobPath) + } + + if err := c.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer); err != nil { + return "", err + } + + blobEndpoint, _ := url.JoinPath(c.azureStorageConfig.ServiceURL, c.azureStorageConfig.AccountName) + blobURL, _ := url.JoinPath(blobEndpoint, c.azureStorageConfig.Container, fullPath) + return blobURL, nil +} diff --git a/internal/app/etl/transformers/mssql.go b/internal/app/etl/transformers/mssql.go index 7270ebb..0500a6e 100644 --- a/internal/app/etl/transformers/mssql.go +++ b/internal/app/etl/transformers/mssql.go @@ -3,18 +3,32 @@ package transformers import ( "context" "errors" + "fmt" + "strings" "sync" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" + log "github.com/sirupsen/logrus" ) -type MssqlTransformer struct{} +type MssqlTransformer struct { + toStorage config.ToStorageConfig + sourceTable config.SourceTableInfo + azureClient *azure.Client +} -func NewMssqlTransformer() etl.Transformer { - return &MssqlTransformer{} +func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.SourceTableInfo, azureClient *azure.Client) etl.Transformer { + return &MssqlTransformer{ + toStorage: toStorage, + sourceTable: sourceTable, + azureClient: azureClient, + } } func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { @@ -60,6 +74,63 @@ func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransfor return plan } +func computeStorageTransformationPlan( + ctx context.Context, + azureClient *azure.Client, + toStorage config.ToStorageConfig, + sourceColumns []models.ColumnType, + sourceTable config.SourceTableInfo, +) []etl.ColumnTransformPlan { + if azureClient == nil || len(toStorage.Columns) == 0 { + return nil + } + + colIndex := make(map[string]int, len(sourceColumns)) + for i, col := range sourceColumns { + colIndex[strings.ToUpper(col.Name())] = i + } + + var plan []etl.ColumnTransformPlan + for _, storageCol := range toStorage.Columns { + if storageCol.Mode != "REFERENCE_ONLY" { + log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source) + continue + } + + idx, ok := colIndex[strings.ToUpper(storageCol.Source)] + if !ok { + log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source) + continue + } + + sourceColName := storageCol.Source + schema := sourceTable.Schema + table := sourceTable.Table + + plan = append(plan, etl.ColumnTransformPlan{ + Index: idx, + Fn: func(v any) (any, error) { + if v == nil { + return nil, nil + } + b, ok := v.([]byte) + if !ok { + log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through", + schema, table, sourceColName, v) + return v, nil + } + blobPath := fmt.Sprintf("%s/%s/%s/%s.bin", schema, table, sourceColName, uuid.New().String()) + blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b) + if err != nil { + return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err) + } + return blobURL, nil + }, + }) + } + return plan +} + const processBatchCtxCheck = 4096 func (mssqlTr *MssqlTransformer) ProcessBatch( @@ -100,6 +171,8 @@ func (mssqlTr *MssqlTransformer) Exec( wgActiveBatches *sync.WaitGroup, ) { transformationPlan := computeTransformationPlan(columns) + storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable) + transformationPlan = append(transformationPlan, storagePlan...) for { if ctx.Err() != nil {