From 078445810697871677292c94f40b1f228eb89032 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Mon, 11 May 2026 11:04:36 -0500 Subject: [PATCH] refactor: add prefix support for storage column configuration and update blob path generation --- config.yaml | 1 + internal/app/azure/main.go | 22 ++++++++++++---------- internal/app/config/migration.go | 1 + internal/app/etl/transformers/plan.go | 6 +++--- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/config.yaml b/config.yaml index cf52338..169ce06 100644 --- a/config.yaml +++ b/config.yaml @@ -71,6 +71,7 @@ jobs: - source: DATA target: FILE_URL mode: REFERENCE_ONLY + prefix: Infraestructura/SITE_HOLDER__ATTACH batches_per_partition: 20 max_extractors: 32 extractor_batch_size: 1 diff --git a/internal/app/azure/main.go b/internal/app/azure/main.go index 7c08bef..6b32a84 100644 --- a/internal/app/azure/main.go +++ b/internal/app/azure/main.go @@ -4,10 +4,13 @@ import ( "context" "errors" "fmt" + "net/http" "net/url" + "path" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" ) var ( @@ -72,16 +75,15 @@ func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer [] return "", ErrInvalidInput } - fullPath := blobPath - if c.azureStorageConfig.Prefix != "" { - fullPath, _ = url.JoinPath(c.azureStorageConfig.Prefix, blobPath) + fullPath := path.Join(c.azureStorageConfig.Prefix, blobPath) + + contentType := http.DetectContentType(buffer) + opts := &azblob.UploadBufferOptions{ + HTTPHeaders: &blob.HTTPHeaders{BlobContentType: &contentType}, + } + if _, err := c.client.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer, opts); err != nil { + return "", fmt.Errorf("uploading blob %s: %w", fullPath, err) } - if err := c.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer); err != nil { - return "", err - } - - blobEndpoint, _ := url.JoinPath(c.azureStorageConfig.ServiceURL, c.azureStorageConfig.AccountName) - blobURL, _ := url.JoinPath(blobEndpoint, c.azureStorageConfig.Container, fullPath) - return blobURL, nil + return fullPath, nil } diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index bb90f36..f30646d 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -20,6 +20,7 @@ type ToStorageColumnConfig struct { Source string `yaml:"source"` Target string `yaml:"target"` Mode string `yaml:"mode"` + Prefix string `yaml:"prefix"` } type ToStorageConfig struct { diff --git a/internal/app/etl/transformers/plan.go b/internal/app/etl/transformers/plan.go index 5758dba..2cea12b 100644 --- a/internal/app/etl/transformers/plan.go +++ b/internal/app/etl/transformers/plan.go @@ -3,6 +3,7 @@ package transformers import ( "context" "fmt" + "path" "strings" "time" @@ -99,12 +100,11 @@ func computeStorageTransformationPlan( } b, ok := v.([]byte) if !ok { - logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through", - schema, table, sourceColName, v) + logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through", schema, table, sourceColName, v) return v, nil } // start := time.Now() - blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String()) + blobPath := path.Join(storageCol.Prefix, uuid.New().String()) blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b) if err != nil { return nil, &custom_errors.JobError{