package extractors import ( "context" "errors" "fmt" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) func extractWithRetries( ctx context.Context, extractor etl.Extractor, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, ) (int64, error) { var totalRowsRead int64 delay := time.Duration(time.Second * 1) currentParitition := partition for { rowsRead, err := extractor.Exec( ctx, tableInfo, columns, batchSize, currentParitition, indexPrimaryKey, chBatchesOut, ) totalRowsRead += rowsRead if err == nil { return totalRowsRead, nil } var exError *custom_errors.ExtractorError if errors.As(err, &exError) { currentParitition.RetryCounter++ if currentParitition.RetryCounter > 3 { return totalRowsRead, &custom_errors.JobError{ Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), Prev: err, } } if exError.HasLastId { currentParitition.ParentId = exError.Partition.Id currentParitition.Id = uuid.New() currentParitition.Range.Min = exError.LastId currentParitition.Range.IsMinInclusive = false } time.Sleep(delay) continue } return totalRowsRead, err } }