refactor: streamline error handling and processing in GenericExtractor; implement partition processing with retries

This commit is contained in:
2026-04-27 01:11:45 -05:00
parent 00459e42e6
commit 6414943cf3
6 changed files with 129 additions and 74 deletions

View File

@@ -111,7 +111,6 @@ func processMigrationJob(
}
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
chPartitions := make(chan models.Partition, job.QueueSize)
chBatchesRaw := make(chan models.Batch, job.QueueSize)
@@ -131,15 +130,6 @@ func processMigrationJob(
}
}()
go custom_errors.ExtractorErrorHandler(
localCtx,
job.Retry,
job.MaxPartitionErrrors,
chExtractorErrors,
chPartitions,
chJobErrors,
&wgActivePartitions,
)
go custom_errors.LoaderErrorHandler(
localCtx,
job.Retry,
@@ -162,7 +152,6 @@ func processMigrationJob(
job.BatchSize,
chPartitions,
chBatchesRaw,
chExtractorErrors,
chJobErrors,
&wgActivePartitions,
&rowsRead,
@@ -216,8 +205,6 @@ func processMigrationJob(
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name)
close(chExtractorErrors)
log.Debugf("chExtractorErrors is closed (%v)", job.Name)
wgExtractors.Wait()
log.Debugf("wgExtractors is empty (%v)", job.Name)

View File

@@ -233,6 +233,8 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
queryString := sbQuery.String()
// logrus.Debugf("Query: %s", queryString)
var queryArgs []any
if q.LowerLimit.IsValid {

View File

@@ -11,6 +11,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/sirupsen/logrus"
)
func (ex *GenericExtractor) Consume(
@@ -20,8 +21,7 @@ func (ex *GenericExtractor) Consume(
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
@@ -33,7 +33,7 @@ func (ex *GenericExtractor) Consume(
select {
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
case chErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
@@ -55,7 +55,7 @@ func (ex *GenericExtractor) Consume(
return
}
rowsReadResult, err := ex.ProcessPartition(
rowsReadResult, err := ex.ProcessPartitionWithRetries(
ctx,
tableInfo,
columns,
@@ -64,38 +64,29 @@ func (ex *GenericExtractor) Consume(
indexPrimaryKey,
chBatchesOut,
)
wgActivePartitions.Done()
if rowsReadResult > 0 {
// current := atomic.LoadInt64(rowsRead)
// logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
current := atomic.LoadInt64(rowsRead)
logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
continue
}
wgActivePartitions.Done()
}
}
}

View File

@@ -1,7 +1,11 @@
package extractors
import (
"context"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type GenericExtractor struct {
@@ -11,3 +15,28 @@ type GenericExtractor struct {
func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor {
return GenericExtractor{db: db}
}
func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error {
select {
case chBatchesOut <- batch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func flush(
ctx context.Context,
partition *models.Partition,
batchSize int,
batchRows []models.UnknownRowValues,
chBatchesOut chan<- models.Batch,
) error {
if len(batchRows) == 0 {
return nil
}
batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
return sendBatch(ctx, chBatchesOut, batch)
}

View File

@@ -0,0 +1,69 @@
package extractors
import (
"context"
"errors"
"fmt"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
// "github.com/sirupsen/logrus"
)
func (ex *GenericExtractor) ProcessPartitionWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
delay := time.Duration(time.Second * 1)
currentParitition := partition
for {
rowsRead, err := ex.ProcessPartition(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
currentParitition.RetryCounter++
if currentParitition.RetryCounter > 3 {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}

View File

@@ -2,7 +2,6 @@ package extractors
import (
"context"
"errors"
"fmt"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
@@ -10,10 +9,10 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
// "github.com/sirupsen/logrus"
)
func errorFromLastRow(
func errorFromLastPartitionRow(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition models.Partition,
@@ -48,8 +47,7 @@ func (ex *GenericExtractor) ProcessPartition(
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
rowsRead := 0
) (int64, error) {
query := dbwrapper.ExtractionQuery{
Schema: tableInfo.Schema,
Table: tableInfo.Table,
@@ -67,15 +65,15 @@ func (ex *GenericExtractor) ProcessPartition(
},
}
// logrus.Debugf("Querying with: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
rows, err := ex.db.QueryFromObject(ctx, query)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
return 0, err
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
var rowsRead int64 = 0
for rows.Next() {
rowValues := make([]any, len(columns))
@@ -87,53 +85,32 @@ func (ex *GenericExtractor) ProcessPartition(
if err := rows.Scan(scanArgs...); err != nil {
if len(batchRows) == 0 {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
return rowsRead, err
}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
lastRow := batchRows[len(batchRows)-1]
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
}
rowsRead++
batchRows = append(batchRows, rowValues)
if len(batchRows) >= batchSize {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
// logrus.Debugf("Batch size reached, flushing batch with %v rows (rowsRead=%v)", len(batchRows), rowsRead)
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
// logrus.Warnf("Error flushing rows: %v", err)
return rowsRead, err
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := rows.Err(); err != nil {
if errors.Is(err, ctx.Err()) {
return rowsRead, ctx.Err()
}
if len(batchRows) > 0 {
lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
}
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
if len(batchRows) > 0 {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
}
return rowsRead, nil
return rowsRead, rows.Err()
}