From 52fe083ab7798ff49c1f8a9def938657a1251fd5 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Sun, 26 Apr 2026 19:39:14 -0500 Subject: [PATCH] refactor: add consume and process methods for GenericExtractor; streamline data extraction logic --- internal/app/etl/extractors/consume.go | 99 +++++++++++ internal/app/etl/extractors/main.go | 221 ------------------------- internal/app/etl/extractors/process.go | 137 +++++++++++++++ 3 files changed, 236 insertions(+), 221 deletions(-) create mode 100644 internal/app/etl/extractors/consume.go create mode 100644 internal/app/etl/extractors/process.go diff --git a/internal/app/etl/extractors/consume.go b/internal/app/etl/extractors/consume.go new file mode 100644 index 0000000..2d17da5 --- /dev/null +++ b/internal/app/etl/extractors/consume.go @@ -0,0 +1,99 @@ +package extractors + +import ( + "context" + "errors" + "slices" + "strings" + "sync" + "sync/atomic" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +func (ex *GenericExtractor) Consume( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + chPartitionsIn <-chan models.Partition, + chBatchesOut chan<- models.Batch, + chErrorsOut chan<- custom_errors.ExtractorError, + chJobErrorsOut chan<- custom_errors.JobError, + wgActivePartitions *sync.WaitGroup, + rowsRead *int64, +) { + indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { + return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) + }) + + if indexPrimaryKey == -1 { + select { + case <-ctx.Done(): + return + case chJobErrorsOut <- custom_errors.JobError{ + ShouldCancelJob: true, + Msg: "Primary key not found in provided columns", + }: + } + + return + } + + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + case partition, ok := <-chPartitionsIn: + if !ok { + return + } + + rowsReadResult, err := ex.ProcessPartition( + ctx, + tableInfo, + columns, + batchSize, + partition, + indexPrimaryKey, + chBatchesOut, + ) + + if rowsReadResult > 0 { + atomic.AddInt64(rowsRead, int64(rowsReadResult)) + } + + if err != nil { + if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { + select { + case <-ctx.Done(): + return + case chErrorsOut <- *exError: + } + } else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { + select { + case <-ctx.Done(): + return + case chJobErrorsOut <- *jobError: + } + } else { + select { + case <-ctx.Done(): + return + case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: + } + } + + continue + } + + wgActivePartitions.Done() + } + } +} diff --git a/internal/app/etl/extractors/main.go b/internal/app/etl/extractors/main.go index 4a7295c..22702bc 100644 --- a/internal/app/etl/extractors/main.go +++ b/internal/app/etl/extractors/main.go @@ -1,20 +1,7 @@ package extractors import ( - "context" - "errors" - "fmt" - "slices" - "strings" - "sync" - "sync/atomic" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" ) type GenericExtractor struct { @@ -24,211 +11,3 @@ type GenericExtractor struct { func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor { return GenericExtractor{db: db} } - -func errorFromLastRow( - lastRow models.UnknownRowValues, - indexPrimaryKey int, - partition models.Partition, - previousError error, -) error { - lastIdRawValue := lastRow[indexPrimaryKey] - - lastId, ok := convert.ToInt64(lastIdRawValue) - if !ok { - currentPartition := partition - currentPartition.RetryCounter = 3 - return &custom_errors.ExtractorError{ - Partition: currentPartition, - HasLastId: true, - Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), - } - } - - return &custom_errors.ExtractorError{ - Partition: partition, - HasLastId: true, - LastId: lastId, - Msg: previousError.Error(), - } -} - -func (ex *GenericExtractor) ProcessPartition( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - partition models.Partition, - indexPrimaryKey int, - chBatchesOut chan<- models.Batch, -) (int, error) { - rowsRead := 0 - query := dbwrapper.ExtractionQuery{ - Schema: tableInfo.Schema, - Table: tableInfo.Table, - PrimaryKey: tableInfo.PrimaryKey, - LowerLimit: dbwrapper.ExtractorQueryLimit{ - IsValid: partition.HasRange && partition.Range.Min > 0, - IsInclusive: partition.Range.IsMinInclusive, - Value: partition.Range.Min, - }, - UpperLimit: dbwrapper.ExtractorQueryLimit{ - IsValid: partition.HasRange && partition.Range.Max > 0, - IsInclusive: partition.Range.IsMaxInclusive, - Value: partition.Range.Max, - }, - } - - rows, err := ex.db.QueryFromObject(ctx, query) - - if err != nil { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - defer rows.Close() - - batchRows := make([]models.UnknownRowValues, 0, batchSize) - - for rows.Next() { - rowValues := make([]any, len(columns)) - scanArgs := make([]any, len(columns)) - - for i := range rowValues { - scanArgs[i] = &rowValues[i] - } - - if err := rows.Scan(scanArgs...); err != nil { - if len(batchRows) == 0 { - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - - lastRow := batchRows[len(batchRows)-1] - - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - - return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) - } - rowsRead++ - - batchRows = append(batchRows, rowValues) - if len(batchRows) >= batchSize { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - - batchRows = make([]models.UnknownRowValues, 0, batchSize) - } - } - - if err := rows.Err(); err != nil { - if errors.Is(err, ctx.Err()) { - return rowsRead, ctx.Err() - } - - if len(batchRows) > 0 { - lastRow := batchRows[len(batchRows)-1] - return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) - } - - return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} - } - - if len(batchRows) > 0 { - select { - case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: - case <-ctx.Done(): - return rowsRead, ctx.Err() - } - } - - return rowsRead, nil -} - -func (ex *GenericExtractor) Consume( - ctx context.Context, - tableInfo config.SourceTableInfo, - columns []models.ColumnType, - batchSize int, - chPartitionsIn <-chan models.Partition, - chBatchesOut chan<- models.Batch, - chErrorsOut chan<- custom_errors.ExtractorError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActivePartitions *sync.WaitGroup, - rowsRead *int64, -) { - indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { - return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) - }) - - if indexPrimaryKey == -1 { - select { - case <-ctx.Done(): - return - case chJobErrorsOut <- custom_errors.JobError{ - ShouldCancelJob: true, - Msg: "Primary key not found in provided columns", - }: - } - - return - } - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - case partition, ok := <-chPartitionsIn: - if !ok { - return - } - - rowsReadResult, err := ex.ProcessPartition( - ctx, - tableInfo, - columns, - batchSize, - partition, - indexPrimaryKey, - chBatchesOut, - ) - - if rowsReadResult > 0 { - atomic.AddInt64(rowsRead, int64(rowsReadResult)) - } - - if err != nil { - if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { - select { - case <-ctx.Done(): - return - case chErrorsOut <- *exError: - } - } else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { - select { - case <-ctx.Done(): - return - case chJobErrorsOut <- *jobError: - } - } else { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: - } - } - - continue - } - - wgActivePartitions.Done() - } - } -} diff --git a/internal/app/etl/extractors/process.go b/internal/app/etl/extractors/process.go new file mode 100644 index 0000000..2f277ce --- /dev/null +++ b/internal/app/etl/extractors/process.go @@ -0,0 +1,137 @@ +package extractors + +import ( + "context" + "errors" + "fmt" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/google/uuid" +) + +func errorFromLastRow( + lastRow models.UnknownRowValues, + indexPrimaryKey int, + partition models.Partition, + previousError error, +) error { + lastIdRawValue := lastRow[indexPrimaryKey] + + lastId, ok := convert.ToInt64(lastIdRawValue) + if !ok { + currentPartition := partition + currentPartition.RetryCounter = 3 + return &custom_errors.ExtractorError{ + Partition: currentPartition, + HasLastId: true, + Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), + } + } + + return &custom_errors.ExtractorError{ + Partition: partition, + HasLastId: true, + LastId: lastId, + Msg: previousError.Error(), + } +} + +func (ex *GenericExtractor) ProcessPartition( + ctx context.Context, + tableInfo config.SourceTableInfo, + columns []models.ColumnType, + batchSize int, + partition models.Partition, + indexPrimaryKey int, + chBatchesOut chan<- models.Batch, +) (int, error) { + rowsRead := 0 + query := dbwrapper.ExtractionQuery{ + Schema: tableInfo.Schema, + Table: tableInfo.Table, + PrimaryKey: tableInfo.PrimaryKey, + LowerLimit: dbwrapper.ExtractorQueryLimit{ + IsValid: partition.HasRange && partition.Range.Min > 0, + IsInclusive: partition.Range.IsMinInclusive, + Value: partition.Range.Min, + }, + UpperLimit: dbwrapper.ExtractorQueryLimit{ + IsValid: partition.HasRange && partition.Range.Max > 0, + IsInclusive: partition.Range.IsMaxInclusive, + Value: partition.Range.Max, + }, + } + + rows, err := ex.db.QueryFromObject(ctx, query) + + if err != nil { + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + } + defer rows.Close() + + batchRows := make([]models.UnknownRowValues, 0, batchSize) + + for rows.Next() { + rowValues := make([]any, len(columns)) + scanArgs := make([]any, len(columns)) + + for i := range rowValues { + scanArgs[i] = &rowValues[i] + } + + if err := rows.Scan(scanArgs...); err != nil { + if len(batchRows) == 0 { + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + } + + lastRow := batchRows[len(batchRows)-1] + + select { + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: + case <-ctx.Done(): + return rowsRead, ctx.Err() + } + + return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) + } + rowsRead++ + + batchRows = append(batchRows, rowValues) + if len(batchRows) >= batchSize { + select { + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: + case <-ctx.Done(): + return rowsRead, ctx.Err() + } + + batchRows = make([]models.UnknownRowValues, 0, batchSize) + } + } + + if err := rows.Err(); err != nil { + if errors.Is(err, ctx.Err()) { + return rowsRead, ctx.Err() + } + + if len(batchRows) > 0 { + lastRow := batchRows[len(batchRows)-1] + return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err) + } + + return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} + } + + if len(batchRows) > 0 { + select { + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: + case <-ctx.Done(): + return rowsRead, ctx.Err() + } + } + + return rowsRead, nil +}