From a216a8016f8788791207835542b2268d6c340527 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Thu, 7 May 2026 08:17:25 -0500 Subject: [PATCH] refactor: update extractor methods to support FromJsonColumns; enhance data processing capabilities --- cmd/go_migrate/process.go | 1 + internal/app/config/migration.go | 6 +++--- internal/app/db-wrapper/mssql.go | 2 ++ internal/app/etl/extractors/consume.go | 2 ++ internal/app/etl/extractors/process-with-retries.go | 2 ++ internal/app/etl/extractors/process.go | 2 ++ 6 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 17a7440..cd1f85b 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -158,6 +158,7 @@ func processMigrationJob( chJobErrors, &wgActivePartitions, &rowsRead, + job.SourceTable.FromJsonColumns, ) }) } diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index 9400b50..cb5dd41 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -54,9 +54,9 @@ type TableInfo struct { } type SourceTableInfo struct { - TableInfo `yaml:",inline"` - PrimaryKey string `yaml:"primary_key"` - FromJsonConfig []FromJsonItem `yaml:"from_json"` + TableInfo `yaml:",inline"` + PrimaryKey string `yaml:"primary_key"` + FromJsonColumns []FromJsonItem `yaml:"from_json"` } type TargetTableInfo struct { diff --git a/internal/app/db-wrapper/mssql.go b/internal/app/db-wrapper/mssql.go index f6ef39e..e3b2090 100644 --- a/internal/app/db-wrapper/mssql.go +++ b/internal/app/db-wrapper/mssql.go @@ -188,6 +188,8 @@ func buildExtractQueryMssql(q ExtractionQuery) (string, error) { hasRegularColumns := len(q.Columns) > 0 hasJsonColumns := len(q.FromJsonColumns) > 0 + // logrus.Debugf("Extraction query: %+v", q) + resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns)) if hasJsonColumns { for _, jsonConfig := range q.FromJsonColumns { diff --git a/internal/app/etl/extractors/consume.go b/internal/app/etl/extractors/consume.go index 9884288..da5cab0 100644 --- a/internal/app/etl/extractors/consume.go +++ b/internal/app/etl/extractors/consume.go @@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume( chErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64, + fromJsonColumns []config.FromJsonItem, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) @@ -65,6 +66,7 @@ func (ex *GenericExtractor) Consume( indexPrimaryKey, retryConfig, chBatchesOut, + fromJsonColumns, ) wgActivePartitions.Done() diff --git a/internal/app/etl/extractors/process-with-retries.go b/internal/app/etl/extractors/process-with-retries.go index 5837306..c04a988 100644 --- a/internal/app/etl/extractors/process-with-retries.go +++ b/internal/app/etl/extractors/process-with-retries.go @@ -22,6 +22,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries( indexPrimaryKey int, retryConfig config.RetryConfig, chBatchesOut chan<- models.Batch, + fromJsonColumns []config.FromJsonItem, ) (int64, error) { var totalRowsRead int64 currentParitition := partition @@ -35,6 +36,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries( currentParitition, indexPrimaryKey, chBatchesOut, + fromJsonColumns, ) // logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table) totalRowsRead += rowsRead diff --git a/internal/app/etl/extractors/process.go b/internal/app/etl/extractors/process.go index d624551..1a56912 100644 --- a/internal/app/etl/extractors/process.go +++ b/internal/app/etl/extractors/process.go @@ -47,6 +47,7 @@ func (ex *GenericExtractor) ProcessPartition( partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, + fromJsonColumns []config.FromJsonItem, ) (int64, error) { query := dbwrapper.ExtractionQuery{ Schema: tableInfo.Schema, @@ -63,6 +64,7 @@ func (ex *GenericExtractor) ProcessPartition( IsInclusive: partition.Range.IsMaxInclusive, Value: partition.Range.Max, }, + FromJsonColumns: fromJsonColumns, } // logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)