feat: refactor error handling to accept max retry attempts as a parameter for improved flexibility
This commit is contained in:
@@ -11,16 +11,6 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Batch struct {
|
|
||||||
Id uuid.UUID
|
|
||||||
ParentId uuid.UUID
|
|
||||||
LowerLimit int64
|
|
||||||
UpperLimit int64
|
|
||||||
IsLowerLimitInclusive bool
|
|
||||||
ShouldUseRange bool
|
|
||||||
RetryCounter int
|
|
||||||
}
|
|
||||||
|
|
||||||
func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) (int64, error) {
|
func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) (int64, error) {
|
||||||
query := `
|
query := `
|
||||||
SELECT
|
SELECT
|
||||||
|
|||||||
@@ -1,103 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ExtractorError struct {
|
|
||||||
Batch
|
|
||||||
LastId int64
|
|
||||||
HasLastId bool
|
|
||||||
Msg string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ExtractorError) Error() string {
|
|
||||||
return e.Msg
|
|
||||||
}
|
|
||||||
|
|
||||||
const maxRetryAttempts = 3
|
|
||||||
|
|
||||||
func extractorErrorHandler(
|
|
||||||
ctx context.Context,
|
|
||||||
chErrorsIn <-chan ExtractorError,
|
|
||||||
chBatchesOut chan<- Batch,
|
|
||||||
chJobErrorsOut chan<- JobError,
|
|
||||||
wgActiveBatches *sync.WaitGroup,
|
|
||||||
) {
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case err, ok := <-chErrorsIn:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err.RetryCounter >= maxRetryAttempts {
|
|
||||||
jobError := JobError{
|
|
||||||
ShouldCancelJob: false,
|
|
||||||
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
|
|
||||||
Prev: &err,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chJobErrorsOut <- jobError:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
wgActiveBatches.Done()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
newBatch := err.Batch
|
|
||||||
newBatch.RetryCounter++
|
|
||||||
|
|
||||||
if err.HasLastId {
|
|
||||||
newBatch.ParentId = err.Id
|
|
||||||
newBatch.Id = uuid.New()
|
|
||||||
newBatch.LowerLimit = err.LastId
|
|
||||||
newBatch.IsLowerLimitInclusive = false
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- newBatch:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func ExtractorErrorFromLastRowMssql(lastRow models.UnknownRowValues, indexPrimaryKey int, batch *Batch, previousError error) ExtractorError {
|
|
||||||
lastIdRawValue := lastRow[indexPrimaryKey]
|
|
||||||
|
|
||||||
lastId, ok := ToInt64(lastIdRawValue)
|
|
||||||
if !ok {
|
|
||||||
currentBatch := *batch
|
|
||||||
currentBatch.RetryCounter = maxRetryAttempts
|
|
||||||
return ExtractorError{
|
|
||||||
Batch: currentBatch,
|
|
||||||
HasLastId: true,
|
|
||||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return ExtractorError{
|
|
||||||
Batch: *batch,
|
|
||||||
HasLastId: true,
|
|
||||||
LastId: lastId,
|
|
||||||
Msg: previousError.Error(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -20,6 +20,7 @@ func (e *LoaderError) Error() string {
|
|||||||
|
|
||||||
func loaderErrorHandler(
|
func loaderErrorHandler(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
maxRetryAttempts int,
|
||||||
chErrorsIn <-chan LoaderError,
|
chErrorsIn <-chan LoaderError,
|
||||||
chChunksOut chan<- models.Chunk,
|
chChunksOut chan<- models.Chunk,
|
||||||
chJobErrorsOut chan<- custom_errors.JobError,
|
chJobErrorsOut chan<- custom_errors.JobError,
|
||||||
|
|||||||
@@ -67,8 +67,8 @@ func processMigrationJob(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go custom_errors.ExtractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
|
go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
|
||||||
go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
|
go loaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
|
||||||
|
|
||||||
maxExtractors := min(job.MaxExtractors, len(batches))
|
maxExtractors := min(job.MaxExtractors, len(batches))
|
||||||
log.Infof("Starting %d extractor(s)...", maxExtractors)
|
log.Infof("Starting %d extractor(s)...", maxExtractors)
|
||||||
|
|||||||
@@ -20,10 +20,9 @@ func (e *ExtractorError) Error() string {
|
|||||||
return e.Msg
|
return e.Msg
|
||||||
}
|
}
|
||||||
|
|
||||||
const maxRetryAttempts = 3
|
|
||||||
|
|
||||||
func ExtractorErrorHandler(
|
func ExtractorErrorHandler(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
maxRetryAttempts int,
|
||||||
chErrorsIn <-chan ExtractorError,
|
chErrorsIn <-chan ExtractorError,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
chJobErrorsOut chan<- JobError,
|
chJobErrorsOut chan<- JobError,
|
||||||
|
|||||||
Reference in New Issue
Block a user