1 Commits

Author SHA1 Message Date
DiegoAlessandroMotta
1c3db39b21 update extractor interface 2026-04-16 23:49:23 -05:00
4 changed files with 78 additions and 22 deletions

View File

@@ -129,7 +129,7 @@ func processMigrationJob(
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
extractor.Exec( extractor.Consume(
localCtx, localCtx,
job.SourceTable, job.SourceTable,
sourceColTypes, sourceColTypes,

View File

@@ -9,6 +9,7 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
@@ -99,7 +100,7 @@ func errorFromLastRow(
} }
} }
func (mssqlEx *MssqlExtractor) ProcessPartition( func (mssqlEx *MssqlExtractor) Extract(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -107,7 +108,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int, error) { ) (int64, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive) query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
var queryArgs []any var queryArgs []any
@@ -118,7 +119,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
) )
} }
rowsRead := 0 var rowsRead int64 = 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...) rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil { if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
@@ -189,14 +190,67 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
return rowsRead, nil return rowsRead, nil
} }
func (mssqlEx *MssqlExtractor) Exec( func (mssqlEx *MssqlExtractor) ExtractWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
var fatalErr error
delay := time.Duration(time.Second * 1)
currentParitition := partition
for fatalErr != nil || currentParitition.RetryCounter < 3 {
currentParitition.RetryCounter++
rowsRead, err := mssqlEx.Extract(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
if rowsRead > 0 {
totalRowsRead += int64(rowsRead)
}
if err != nil {
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
} else {
fatalErr = err
}
continue
}
break
}
return totalRowsRead, fatalErr
}
func (mssqlEx *MssqlExtractor) Consume(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
batchSize int, batchSize int,
chPartitionsIn <-chan models.Partition, chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
rowsRead *int64, rowsRead *int64,
@@ -231,7 +285,7 @@ func (mssqlEx *MssqlExtractor) Exec(
return return
} }
rowsReadResult, err := mssqlEx.ProcessPartition( rowsReadResult, err := mssqlEx.ExtractWithRetries(
ctx, ctx,
tableInfo, tableInfo,
columns, columns,
@@ -246,15 +300,8 @@ func (mssqlEx *MssqlExtractor) Exec(
} }
if err != nil { if err != nil {
var exError *custom_errors.ExtractorError
var jobError *custom_errors.JobError var jobError *custom_errors.JobError
if errors.As(err, &exError) { if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if errors.As(err, &jobError) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@@ -264,7 +311,7 @@ func (mssqlEx *MssqlExtractor) Exec(
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
} }
} }

View File

@@ -51,7 +51,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
} }
func (postgresEx *PostgresExtractor) ProcessPartition( func (postgresEx *PostgresExtractor) Extract(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -110,7 +110,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
return rowsRead, nil return rowsRead, nil
} }
func (postgresEx *PostgresExtractor) Exec( func (postgresEx *PostgresExtractor) Consume(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,

View File

@@ -10,7 +10,7 @@ import (
) )
type Extractor interface { type Extractor interface {
ProcessPartition( Extract(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -18,16 +18,25 @@ type Extractor interface {
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
) (int, error) ) (int64, error)
Exec( ExtractWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error)
Consume(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
batchSize int, batchSize int,
chPartitionsIn <-chan models.Partition, chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
rowsRead *int64, rowsRead *int64,