refactor: update extractor methods to support FromJsonColumns; enhance data processing capabilities

This commit is contained in:
2026-05-07 08:17:25 -05:00
parent 46ddd0d6b7
commit a216a8016f
6 changed files with 12 additions and 3 deletions

View File

@@ -158,6 +158,7 @@ func processMigrationJob(
chJobErrors, chJobErrors,
&wgActivePartitions, &wgActivePartitions,
&rowsRead, &rowsRead,
job.SourceTable.FromJsonColumns,
) )
}) })
} }

View File

@@ -56,7 +56,7 @@ type TableInfo struct {
type SourceTableInfo struct { type SourceTableInfo struct {
TableInfo `yaml:",inline"` TableInfo `yaml:",inline"`
PrimaryKey string `yaml:"primary_key"` PrimaryKey string `yaml:"primary_key"`
FromJsonConfig []FromJsonItem `yaml:"from_json"` FromJsonColumns []FromJsonItem `yaml:"from_json"`
} }
type TargetTableInfo struct { type TargetTableInfo struct {

View File

@@ -188,6 +188,8 @@ func buildExtractQueryMssql(q ExtractionQuery) (string, error) {
hasRegularColumns := len(q.Columns) > 0 hasRegularColumns := len(q.Columns) > 0
hasJsonColumns := len(q.FromJsonColumns) > 0 hasJsonColumns := len(q.FromJsonColumns) > 0
// logrus.Debugf("Extraction query: %+v", q)
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns)) resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
if hasJsonColumns { if hasJsonColumns {
for _, jsonConfig := range q.FromJsonColumns { for _, jsonConfig := range q.FromJsonColumns {

View File

@@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume(
chErrorsOut chan<- custom_errors.JobError, chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
rowsRead *int64, rowsRead *int64,
fromJsonColumns []config.FromJsonItem,
) { ) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
@@ -65,6 +66,7 @@ func (ex *GenericExtractor) Consume(
indexPrimaryKey, indexPrimaryKey,
retryConfig, retryConfig,
chBatchesOut, chBatchesOut,
fromJsonColumns,
) )
wgActivePartitions.Done() wgActivePartitions.Done()

View File

@@ -22,6 +22,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
indexPrimaryKey int, indexPrimaryKey int,
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
fromJsonColumns []config.FromJsonItem,
) (int64, error) { ) (int64, error) {
var totalRowsRead int64 var totalRowsRead int64
currentParitition := partition currentParitition := partition
@@ -35,6 +36,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
currentParitition, currentParitition,
indexPrimaryKey, indexPrimaryKey,
chBatchesOut, chBatchesOut,
fromJsonColumns,
) )
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table) // logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
totalRowsRead += rowsRead totalRowsRead += rowsRead

View File

@@ -47,6 +47,7 @@ func (ex *GenericExtractor) ProcessPartition(
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
fromJsonColumns []config.FromJsonItem,
) (int64, error) { ) (int64, error) {
query := dbwrapper.ExtractionQuery{ query := dbwrapper.ExtractionQuery{
Schema: tableInfo.Schema, Schema: tableInfo.Schema,
@@ -63,6 +64,7 @@ func (ex *GenericExtractor) ProcessPartition(
IsInclusive: partition.Range.IsMaxInclusive, IsInclusive: partition.Range.IsMaxInclusive,
Value: partition.Range.Max, Value: partition.Range.Max,
}, },
FromJsonColumns: fromJsonColumns,
} }
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table) // logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)