update extractor interface

This commit is contained in:
DiegoAlessandroMotta
2026-04-16 23:49:23 -05:00
parent 39c0d99502
commit 1c3db39b21
4 changed files with 78 additions and 22 deletions

View File

@@ -9,6 +9,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
@@ -99,7 +100,7 @@ func errorFromLastRow(
}
}
func (mssqlEx *MssqlExtractor) ProcessPartition(
func (mssqlEx *MssqlExtractor) Extract(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
@@ -107,7 +108,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
) (int64, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
var queryArgs []any
@@ -118,7 +119,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
)
}
rowsRead := 0
var rowsRead int64 = 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
@@ -189,14 +190,67 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
return rowsRead, nil
}
func (mssqlEx *MssqlExtractor) Exec(
func (mssqlEx *MssqlExtractor) ExtractWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
var fatalErr error
delay := time.Duration(time.Second * 1)
currentParitition := partition
for fatalErr != nil || currentParitition.RetryCounter < 3 {
currentParitition.RetryCounter++
rowsRead, err := mssqlEx.Extract(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
if rowsRead > 0 {
totalRowsRead += int64(rowsRead)
}
if err != nil {
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
} else {
fatalErr = err
}
continue
}
break
}
return totalRowsRead, fatalErr
}
func (mssqlEx *MssqlExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
@@ -231,7 +285,7 @@ func (mssqlEx *MssqlExtractor) Exec(
return
}
rowsReadResult, err := mssqlEx.ProcessPartition(
rowsReadResult, err := mssqlEx.ExtractWithRetries(
ctx,
tableInfo,
columns,
@@ -246,15 +300,8 @@ func (mssqlEx *MssqlExtractor) Exec(
}
if err != nil {
var exError *custom_errors.ExtractorError
var jobError *custom_errors.JobError
if errors.As(err, &exError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if errors.As(err, &jobError) {
if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
@@ -264,7 +311,7 @@ func (mssqlEx *MssqlExtractor) Exec(
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}

View File

@@ -51,7 +51,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
}
func (postgresEx *PostgresExtractor) ProcessPartition(
func (postgresEx *PostgresExtractor) Extract(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
@@ -110,7 +110,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
return rowsRead, nil
}
func (postgresEx *PostgresExtractor) Exec(
func (postgresEx *PostgresExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,