package extractors import ( "context" "errors" "fmt" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" // "github.com/sirupsen/logrus" ) func (ex *GenericExtractor) ProcessPartitionWithRetries( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, partition models.Partition, indexPrimaryKey int, retryConfig config.RetryConfig, chBatchesOut chan<- models.Batch, fromJsonColumns []config.FromJsonItem, ) (int64, error) { var totalRowsRead int64 currentParitition := partition for { rowsRead, err := ex.ProcessPartition( ctx, tableInfo, columns, batchSize, currentParitition, indexPrimaryKey, chBatchesOut, fromJsonColumns, ) // logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table) totalRowsRead += rowsRead if err == nil { return totalRowsRead, nil } if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { currentParitition.RetryCounter++ if currentParitition.RetryCounter >= retryConfig.Attempts { return totalRowsRead, &custom_errors.JobError{ Msg: fmt.Sprintf("Partition %v reached max retries (%d)", currentParitition.Id, currentParitition.RetryCounter), Prev: err, } } if exError.HasLastId { currentParitition.ParentId = exError.Partition.Id currentParitition.Id = uuid.New() currentParitition.Range.Min = exError.LastId currentParitition.Range.IsMinInclusive = false } delay := custom_errors.ComputeBackoffDelay( currentParitition.RetryCounter, retryConfig.BaseDelayMs, retryConfig.MaxDelayMs, retryConfig.MaxJitterMs, ) time.Sleep(delay) continue } return totalRowsRead, err } }