Compare commits
1 Commits
refactor/r
...
1c3db39b21
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c3db39b21 |
@@ -129,7 +129,7 @@ func processMigrationJob(
|
|||||||
|
|
||||||
for range maxExtractors {
|
for range maxExtractors {
|
||||||
wgExtractors.Go(func() {
|
wgExtractors.Go(func() {
|
||||||
extractor.Exec(
|
extractor.Consume(
|
||||||
localCtx,
|
localCtx,
|
||||||
job.SourceTable,
|
job.SourceTable,
|
||||||
sourceColTypes,
|
sourceColTypes,
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
||||||
@@ -99,7 +100,7 @@ func errorFromLastRow(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mssqlEx *MssqlExtractor) ProcessPartition(
|
func (mssqlEx *MssqlExtractor) Extract(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.SourceTableInfo,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
@@ -107,7 +108,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
|
|||||||
partition models.Partition,
|
partition models.Partition,
|
||||||
indexPrimaryKey int,
|
indexPrimaryKey int,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
) (int, error) {
|
) (int64, error) {
|
||||||
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
|
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
|
||||||
|
|
||||||
var queryArgs []any
|
var queryArgs []any
|
||||||
@@ -118,7 +119,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
rowsRead := 0
|
var rowsRead int64 = 0
|
||||||
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
|
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||||
@@ -189,14 +190,67 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
|
|||||||
return rowsRead, nil
|
return rowsRead, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mssqlEx *MssqlExtractor) Exec(
|
func (mssqlEx *MssqlExtractor) ExtractWithRetries(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo config.SourceTableInfo,
|
||||||
|
columns []models.ColumnType,
|
||||||
|
batchSize int,
|
||||||
|
partition models.Partition,
|
||||||
|
indexPrimaryKey int,
|
||||||
|
chBatchesOut chan<- models.Batch,
|
||||||
|
) (int64, error) {
|
||||||
|
var totalRowsRead int64
|
||||||
|
var fatalErr error
|
||||||
|
delay := time.Duration(time.Second * 1)
|
||||||
|
currentParitition := partition
|
||||||
|
|
||||||
|
for fatalErr != nil || currentParitition.RetryCounter < 3 {
|
||||||
|
currentParitition.RetryCounter++
|
||||||
|
rowsRead, err := mssqlEx.Extract(
|
||||||
|
ctx,
|
||||||
|
tableInfo,
|
||||||
|
columns,
|
||||||
|
batchSize,
|
||||||
|
currentParitition,
|
||||||
|
indexPrimaryKey,
|
||||||
|
chBatchesOut,
|
||||||
|
)
|
||||||
|
|
||||||
|
if rowsRead > 0 {
|
||||||
|
totalRowsRead += int64(rowsRead)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
var exError *custom_errors.ExtractorError
|
||||||
|
if errors.As(err, &exError) {
|
||||||
|
if exError.HasLastId {
|
||||||
|
currentParitition.ParentId = exError.Partition.Id
|
||||||
|
currentParitition.Id = uuid.New()
|
||||||
|
currentParitition.Range.Min = exError.LastId
|
||||||
|
currentParitition.Range.IsMinInclusive = false
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(delay)
|
||||||
|
} else {
|
||||||
|
fatalErr = err
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalRowsRead, fatalErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mssqlEx *MssqlExtractor) Consume(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.SourceTableInfo,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
batchSize int,
|
batchSize int,
|
||||||
chPartitionsIn <-chan models.Partition,
|
chPartitionsIn <-chan models.Partition,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
chErrorsOut chan<- custom_errors.ExtractorError,
|
|
||||||
chJobErrorsOut chan<- custom_errors.JobError,
|
chJobErrorsOut chan<- custom_errors.JobError,
|
||||||
wgActivePartitions *sync.WaitGroup,
|
wgActivePartitions *sync.WaitGroup,
|
||||||
rowsRead *int64,
|
rowsRead *int64,
|
||||||
@@ -231,7 +285,7 @@ func (mssqlEx *MssqlExtractor) Exec(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rowsReadResult, err := mssqlEx.ProcessPartition(
|
rowsReadResult, err := mssqlEx.ExtractWithRetries(
|
||||||
ctx,
|
ctx,
|
||||||
tableInfo,
|
tableInfo,
|
||||||
columns,
|
columns,
|
||||||
@@ -246,15 +300,8 @@ func (mssqlEx *MssqlExtractor) Exec(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var exError *custom_errors.ExtractorError
|
|
||||||
var jobError *custom_errors.JobError
|
var jobError *custom_errors.JobError
|
||||||
if errors.As(err, &exError) {
|
if errors.As(err, &jobError) {
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case chErrorsOut <- *exError:
|
|
||||||
}
|
|
||||||
} else if errors.As(err, &jobError) {
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
@@ -264,7 +311,7 @@ func (mssqlEx *MssqlExtractor) Exec(
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
|
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
|
|||||||
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
|
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (postgresEx *PostgresExtractor) ProcessPartition(
|
func (postgresEx *PostgresExtractor) Extract(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.SourceTableInfo,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
@@ -110,7 +110,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
|
|||||||
return rowsRead, nil
|
return rowsRead, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (postgresEx *PostgresExtractor) Exec(
|
func (postgresEx *PostgresExtractor) Consume(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.SourceTableInfo,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Extractor interface {
|
type Extractor interface {
|
||||||
ProcessPartition(
|
Extract(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.SourceTableInfo,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
@@ -18,16 +18,25 @@ type Extractor interface {
|
|||||||
partition models.Partition,
|
partition models.Partition,
|
||||||
indexPrimaryKey int,
|
indexPrimaryKey int,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
) (int, error)
|
) (int64, error)
|
||||||
|
|
||||||
Exec(
|
ExtractWithRetries(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo config.SourceTableInfo,
|
||||||
|
columns []models.ColumnType,
|
||||||
|
batchSize int,
|
||||||
|
partition models.Partition,
|
||||||
|
indexPrimaryKey int,
|
||||||
|
chBatchesOut chan<- models.Batch,
|
||||||
|
) (int64, error)
|
||||||
|
|
||||||
|
Consume(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.SourceTableInfo,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
batchSize int,
|
batchSize int,
|
||||||
chPartitionsIn <-chan models.Partition,
|
chPartitionsIn <-chan models.Partition,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
chErrorsOut chan<- custom_errors.ExtractorError,
|
|
||||||
chJobErrorsOut chan<- custom_errors.JobError,
|
chJobErrorsOut chan<- custom_errors.JobError,
|
||||||
wgActivePartitions *sync.WaitGroup,
|
wgActivePartitions *sync.WaitGroup,
|
||||||
rowsRead *int64,
|
rowsRead *int64,
|
||||||
|
|||||||
Reference in New Issue
Block a user