refactor: add consume and process methods for GenericExtractor; streamline data extraction logic
This commit is contained in:
99
internal/app/etl/extractors/consume.go
Normal file
99
internal/app/etl/extractors/consume.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
func (ex *GenericExtractor) Consume(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
chPartitionsIn <-chan models.Partition,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chErrorsOut chan<- custom_errors.ExtractorError,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActivePartitions *sync.WaitGroup,
|
||||
rowsRead *int64,
|
||||
) {
|
||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
||||
})
|
||||
|
||||
if indexPrimaryKey == -1 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- custom_errors.JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: "Primary key not found in provided columns",
|
||||
}:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case partition, ok := <-chPartitionsIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
rowsReadResult, err := ex.ProcessPartition(
|
||||
ctx,
|
||||
tableInfo,
|
||||
columns,
|
||||
batchSize,
|
||||
partition,
|
||||
indexPrimaryKey,
|
||||
chBatchesOut,
|
||||
)
|
||||
|
||||
if rowsReadResult > 0 {
|
||||
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- *exError:
|
||||
}
|
||||
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
wgActivePartitions.Done()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,20 +1,7 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type GenericExtractor struct {
|
||||
@@ -24,211 +11,3 @@ type GenericExtractor struct {
|
||||
func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor {
|
||||
return GenericExtractor{db: db}
|
||||
}
|
||||
|
||||
func errorFromLastRow(
|
||||
lastRow models.UnknownRowValues,
|
||||
indexPrimaryKey int,
|
||||
partition models.Partition,
|
||||
previousError error,
|
||||
) error {
|
||||
lastIdRawValue := lastRow[indexPrimaryKey]
|
||||
|
||||
lastId, ok := convert.ToInt64(lastIdRawValue)
|
||||
if !ok {
|
||||
currentPartition := partition
|
||||
currentPartition.RetryCounter = 3
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: currentPartition,
|
||||
HasLastId: true,
|
||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: partition,
|
||||
HasLastId: true,
|
||||
LastId: lastId,
|
||||
Msg: previousError.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
func (ex *GenericExtractor) ProcessPartition(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int, error) {
|
||||
rowsRead := 0
|
||||
query := dbwrapper.ExtractionQuery{
|
||||
Schema: tableInfo.Schema,
|
||||
Table: tableInfo.Table,
|
||||
PrimaryKey: tableInfo.PrimaryKey,
|
||||
LowerLimit: dbwrapper.ExtractorQueryLimit{
|
||||
IsValid: partition.HasRange && partition.Range.Min > 0,
|
||||
IsInclusive: partition.Range.IsMinInclusive,
|
||||
Value: partition.Range.Min,
|
||||
},
|
||||
UpperLimit: dbwrapper.ExtractorQueryLimit{
|
||||
IsValid: partition.HasRange && partition.Range.Max > 0,
|
||||
IsInclusive: partition.Range.IsMaxInclusive,
|
||||
Value: partition.Range.Max,
|
||||
},
|
||||
}
|
||||
|
||||
rows, err := ex.db.QueryFromObject(ctx, query)
|
||||
|
||||
if err != nil {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
||||
|
||||
for rows.Next() {
|
||||
rowValues := make([]any, len(columns))
|
||||
scanArgs := make([]any, len(columns))
|
||||
|
||||
for i := range rowValues {
|
||||
scanArgs[i] = &rowValues[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
if len(batchRows) == 0 {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
rowsRead++
|
||||
|
||||
batchRows = append(batchRows, rowValues)
|
||||
if len(batchRows) >= batchSize {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return rowsRead, nil
|
||||
}
|
||||
|
||||
func (ex *GenericExtractor) Consume(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
chPartitionsIn <-chan models.Partition,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chErrorsOut chan<- custom_errors.ExtractorError,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActivePartitions *sync.WaitGroup,
|
||||
rowsRead *int64,
|
||||
) {
|
||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
||||
})
|
||||
|
||||
if indexPrimaryKey == -1 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- custom_errors.JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: "Primary key not found in provided columns",
|
||||
}:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case partition, ok := <-chPartitionsIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
rowsReadResult, err := ex.ProcessPartition(
|
||||
ctx,
|
||||
tableInfo,
|
||||
columns,
|
||||
batchSize,
|
||||
partition,
|
||||
indexPrimaryKey,
|
||||
chBatchesOut,
|
||||
)
|
||||
|
||||
if rowsReadResult > 0 {
|
||||
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- *exError:
|
||||
}
|
||||
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
wgActivePartitions.Done()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
137
internal/app/etl/extractors/process.go
Normal file
137
internal/app/etl/extractors/process.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func errorFromLastRow(
|
||||
lastRow models.UnknownRowValues,
|
||||
indexPrimaryKey int,
|
||||
partition models.Partition,
|
||||
previousError error,
|
||||
) error {
|
||||
lastIdRawValue := lastRow[indexPrimaryKey]
|
||||
|
||||
lastId, ok := convert.ToInt64(lastIdRawValue)
|
||||
if !ok {
|
||||
currentPartition := partition
|
||||
currentPartition.RetryCounter = 3
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: currentPartition,
|
||||
HasLastId: true,
|
||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: partition,
|
||||
HasLastId: true,
|
||||
LastId: lastId,
|
||||
Msg: previousError.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
func (ex *GenericExtractor) ProcessPartition(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int, error) {
|
||||
rowsRead := 0
|
||||
query := dbwrapper.ExtractionQuery{
|
||||
Schema: tableInfo.Schema,
|
||||
Table: tableInfo.Table,
|
||||
PrimaryKey: tableInfo.PrimaryKey,
|
||||
LowerLimit: dbwrapper.ExtractorQueryLimit{
|
||||
IsValid: partition.HasRange && partition.Range.Min > 0,
|
||||
IsInclusive: partition.Range.IsMinInclusive,
|
||||
Value: partition.Range.Min,
|
||||
},
|
||||
UpperLimit: dbwrapper.ExtractorQueryLimit{
|
||||
IsValid: partition.HasRange && partition.Range.Max > 0,
|
||||
IsInclusive: partition.Range.IsMaxInclusive,
|
||||
Value: partition.Range.Max,
|
||||
},
|
||||
}
|
||||
|
||||
rows, err := ex.db.QueryFromObject(ctx, query)
|
||||
|
||||
if err != nil {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
||||
|
||||
for rows.Next() {
|
||||
rowValues := make([]any, len(columns))
|
||||
scanArgs := make([]any, len(columns))
|
||||
|
||||
for i := range rowValues {
|
||||
scanArgs[i] = &rowValues[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
if len(batchRows) == 0 {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
rowsRead++
|
||||
|
||||
batchRows = append(batchRows, rowValues)
|
||||
if len(batchRows) >= batchSize {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return rowsRead, nil
|
||||
}
|
||||
Reference in New Issue
Block a user