feat: integrate Azure storage handling in migration process; update transformers and job processing logic

This commit is contained in:
2026-04-21 14:15:20 -05:00
parent bb7b35619a
commit 3b1371a270
5 changed files with 135 additions and 8 deletions

24
.atl/skill-registry.md Normal file
View File

@@ -0,0 +1,24 @@
# Skill Registry — go-migrate
Generated: 2026-04-21
## Compact Rules
### Go conventions
- Use existing error wrapping pattern: `fmt.Errorf("context: %w", err)`
- Channel-based pipeline — keep goroutine lifecycle clean (close channels in correct order)
- No comments unless non-obvious WHY; no docstrings
- Prefer named returns only when it aids clarity in short functions
- Use `strings.EqualFold` for case-insensitive column name comparison
### Project conventions
- Config structs live in `internal/app/config/`
- ETL interfaces live in `internal/app/etl/types.go`
- Transformer implementations in `internal/app/etl/transformers/`
- Azure operations via `internal/app/azure/main.go`
- Per-job transformer creation (not shared) when job has storage config
## User Skills
| Trigger | Skill |
|---------|-------|
| sdd-* | SDD workflow skills |

View File

@@ -5,12 +5,12 @@ import (
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@@ -119,9 +119,17 @@ func processMigrationJobs(
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer()
loader := loaders.NewGenericLoader(targetDb) loader := loaders.NewGenericLoader(targetDb)
var azureClient *azure.Client
if config.App.AzureStorage.Enabled {
var err error
azureClient, err = azure.NewClient(config.App.AzureStorage)
if err != nil {
log.Fatalf("Failed to create Azure storage client: %v", err)
}
}
for i := range maxParallelWorkers { for i := range maxParallelWorkers {
wgJobs.Go(func() { wgJobs.Go(func() {
for job := range chJobs { for job := range chJobs {
@@ -132,7 +140,7 @@ func processMigrationJobs(
sourceTableAnalyzer, sourceTableAnalyzer,
targetTableAnalyzer, targetTableAnalyzer,
extractor, extractor,
transformer, azureClient,
loader, loader,
job, job,
targetDb.GetDialect(), targetDb.GetDialect(),

View File

@@ -7,12 +7,14 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@@ -38,11 +40,12 @@ func processMigrationJob(
sourceTableAnalyzer etl.TableAnalyzer, sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor, extractor etl.Extractor,
transformer etl.Transformer, azureClient *azure.Client,
loader etl.Loader, loader etl.Loader,
job config.Job, job config.Job,
targetDbType string, targetDbType string,
) models.JobResult { ) models.JobResult {
transformer := transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
localCtx, cancel := context.WithCancel(ctx) localCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()

View File

@@ -66,3 +66,22 @@ func (c *Client) UploadBuffer(ctx context.Context, containerName, blobPath strin
} }
return nil return nil
} }
func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []byte) (string, error) {
if blobPath == "" || buffer == nil {
return "", ErrInvalidInput
}
fullPath := blobPath
if c.azureStorageConfig.Prefix != "" {
fullPath, _ = url.JoinPath(c.azureStorageConfig.Prefix, blobPath)
}
if err := c.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer); err != nil {
return "", err
}
blobEndpoint, _ := url.JoinPath(c.azureStorageConfig.ServiceURL, c.azureStorageConfig.AccountName)
blobURL, _ := url.JoinPath(blobEndpoint, c.azureStorageConfig.Container, fullPath)
return blobURL, nil
}

View File

@@ -3,18 +3,32 @@ package transformers
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"strings"
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
) )
type MssqlTransformer struct{} type MssqlTransformer struct {
toStorage config.ToStorageConfig
sourceTable config.SourceTableInfo
azureClient *azure.Client
}
func NewMssqlTransformer() etl.Transformer { func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.SourceTableInfo, azureClient *azure.Client) etl.Transformer {
return &MssqlTransformer{} return &MssqlTransformer{
toStorage: toStorage,
sourceTable: sourceTable,
azureClient: azureClient,
}
} }
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
@@ -60,6 +74,63 @@ func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransfor
return plan return plan
} }
func computeStorageTransformationPlan(
ctx context.Context,
azureClient *azure.Client,
toStorage config.ToStorageConfig,
sourceColumns []models.ColumnType,
sourceTable config.SourceTableInfo,
) []etl.ColumnTransformPlan {
if azureClient == nil || len(toStorage.Columns) == 0 {
return nil
}
colIndex := make(map[string]int, len(sourceColumns))
for i, col := range sourceColumns {
colIndex[strings.ToUpper(col.Name())] = i
}
var plan []etl.ColumnTransformPlan
for _, storageCol := range toStorage.Columns {
if storageCol.Mode != "REFERENCE_ONLY" {
log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
continue
}
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
if !ok {
log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
continue
}
sourceColName := storageCol.Source
schema := sourceTable.Schema
table := sourceTable.Table
plan = append(plan, etl.ColumnTransformPlan{
Index: idx,
Fn: func(v any) (any, error) {
if v == nil {
return nil, nil
}
b, ok := v.([]byte)
if !ok {
log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
schema, table, sourceColName, v)
return v, nil
}
blobPath := fmt.Sprintf("%s/%s/%s/%s.bin", schema, table, sourceColName, uuid.New().String())
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
if err != nil {
return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err)
}
return blobURL, nil
},
})
}
return plan
}
const processBatchCtxCheck = 4096 const processBatchCtxCheck = 4096
func (mssqlTr *MssqlTransformer) ProcessBatch( func (mssqlTr *MssqlTransformer) ProcessBatch(
@@ -100,6 +171,8 @@ func (mssqlTr *MssqlTransformer) Exec(
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
) { ) {
transformationPlan := computeTransformationPlan(columns) transformationPlan := computeTransformationPlan(columns)
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...)
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {