refactor: implement extractor retry logic and streamline extractor interface

This commit is contained in:
2026-04-17 00:33:49 -05:00
parent 15d1b96849
commit 46597c4ffd
6 changed files with 170 additions and 235 deletions

View File

@@ -0,0 +1,94 @@
package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
func Consume(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := extractWithRetries(
ctx,
extractor,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
wgActivePartitions.Done()
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
var jobError *custom_errors.JobError
if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
continue
}
}
}
}

View File

@@ -0,0 +1,70 @@
package extractors
import (
"context"
"errors"
"fmt"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func extractWithRetries(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
delay := time.Duration(time.Second * 1)
currentParitition := partition
for {
rowsRead, err := extractor.Exec(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
currentParitition.RetryCounter++
if currentParitition.RetryCounter > 3 {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}

View File

@@ -3,20 +3,13 @@ package extractors
import (
"context"
"database/sql"
"errors"
"fmt"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type MssqlExtractor struct {
@@ -71,7 +64,7 @@ func buildExtractQueryMssql(
return sbQuery.String()
}
func (mssqlEx *MssqlExtractor) Extract(
func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
@@ -131,135 +124,3 @@ func (mssqlEx *MssqlExtractor) Extract(
return rowsRead, rows.Err()
}
func (mssqlEx *MssqlExtractor) ExtractWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
delay := time.Duration(time.Second * 1)
currentParitition := partition
for {
rowsRead, err := mssqlEx.Extract(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
currentParitition.RetryCounter++
if currentParitition.RetryCounter > 3 {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}
func (mssqlEx *MssqlExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := mssqlEx.ExtractWithRetries(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
wgActivePartitions.Done()
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
var jobError *custom_errors.JobError
if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
continue
}
}
}
}

View File

@@ -5,8 +5,6 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
@@ -52,7 +50,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
}
func (postgresEx *PostgresExtractor) Extract(
func (postgresEx *PostgresExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
@@ -110,71 +108,3 @@ func (postgresEx *PostgresExtractor) Extract(
return rowsRead, nil
}
func (postgresEx *PostgresExtractor) ExtractWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
delay := time.Duration(time.Second * 1)
currentParitition := partition
for {
rowsRead, err := postgresEx.Extract(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
currentParitition.RetryCounter++
if currentParitition.RetryCounter > 3 {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}
func (postgresEx *PostgresExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
}