Files
go-migrate/internal/app/etl/extractors/extract-with-retries.go

71 lines
1.5 KiB
Go

package extractors
import (
"context"
"errors"
"fmt"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func extractWithRetries(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
delay := time.Duration(time.Second * 1)
currentParitition := partition
for {
rowsRead, err := extractor.Exec(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
currentParitition.RetryCounter++
if currentParitition.RetryCounter > 3 {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}