refactor: replace specific extractor implementations with a generic extractor; remove mssql and postgres extractor files
This commit is contained in:
@@ -12,13 +12,144 @@ import (
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func Consume(
|
||||
type GenericExtractor struct {
|
||||
db dbwrapper.DbWrapper
|
||||
}
|
||||
|
||||
func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor {
|
||||
return GenericExtractor{db: db}
|
||||
}
|
||||
|
||||
func errorFromLastRow(
|
||||
lastRow models.UnknownRowValues,
|
||||
indexPrimaryKey int,
|
||||
partition models.Partition,
|
||||
previousError error,
|
||||
) error {
|
||||
lastIdRawValue := lastRow[indexPrimaryKey]
|
||||
|
||||
lastId, ok := convert.ToInt64(lastIdRawValue)
|
||||
if !ok {
|
||||
currentPartition := partition
|
||||
currentPartition.RetryCounter = 3
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: currentPartition,
|
||||
HasLastId: true,
|
||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: partition,
|
||||
HasLastId: true,
|
||||
LastId: lastId,
|
||||
Msg: previousError.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
func (ex *GenericExtractor) ProcessPartition(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int, error) {
|
||||
rowsRead := 0
|
||||
query := dbwrapper.ExtractionQuery{
|
||||
Schema: tableInfo.Schema,
|
||||
Table: tableInfo.Table,
|
||||
PrimaryKey: tableInfo.PrimaryKey,
|
||||
LowerLimit: dbwrapper.ExtractorQueryLimit{
|
||||
IsValid: partition.HasRange && partition.Range.Min > 0,
|
||||
IsInclusive: partition.Range.IsMinInclusive,
|
||||
Value: partition.Range.Min,
|
||||
},
|
||||
UpperLimit: dbwrapper.ExtractorQueryLimit{
|
||||
IsValid: partition.HasRange && partition.Range.Max > 0,
|
||||
IsInclusive: partition.Range.IsMaxInclusive,
|
||||
Value: partition.Range.Max,
|
||||
},
|
||||
}
|
||||
|
||||
rows, err := ex.db.QueryFromObject(ctx, query)
|
||||
|
||||
if err != nil {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
||||
|
||||
for rows.Next() {
|
||||
rowValues := make([]any, len(columns))
|
||||
scanArgs := make([]any, len(columns))
|
||||
|
||||
for i := range rowValues {
|
||||
scanArgs[i] = &rowValues[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
if len(batchRows) == 0 {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
rowsRead++
|
||||
|
||||
batchRows = append(batchRows, rowValues)
|
||||
if len(batchRows) >= batchSize {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return rowsRead, nil
|
||||
}
|
||||
|
||||
func (ex *GenericExtractor) Consume(
|
||||
ctx context.Context,
|
||||
extractor etl.Extractor,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
@@ -59,7 +190,7 @@ func Consume(
|
||||
return
|
||||
}
|
||||
|
||||
rowsReadResult, err := extractor.ProcessPartition(
|
||||
rowsReadResult, err := ex.ProcessPartition(
|
||||
ctx,
|
||||
tableInfo,
|
||||
columns,
|
||||
@@ -101,30 +232,3 @@ func Consume(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func errorFromLastRow(
|
||||
lastRow models.UnknownRowValues,
|
||||
indexPrimaryKey int,
|
||||
partition models.Partition,
|
||||
previousError error,
|
||||
) error {
|
||||
lastIdRawValue := lastRow[indexPrimaryKey]
|
||||
|
||||
lastId, ok := convert.ToInt64(lastIdRawValue)
|
||||
if !ok {
|
||||
currentPartition := partition
|
||||
currentPartition.RetryCounter = 3
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: currentPartition,
|
||||
HasLastId: true,
|
||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: partition,
|
||||
HasLastId: true,
|
||||
LastId: lastId,
|
||||
Msg: previousError.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user