refactor: implement loaderAccumulator for batch processing; streamline error handling in Consume
This commit is contained in:
@@ -13,6 +13,62 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type loaderAccumulator struct {
|
||||
batchSize int
|
||||
rows []models.UnknownRowValues
|
||||
parents []models.BatchRef
|
||||
pendingDone int
|
||||
}
|
||||
|
||||
func (a *loaderAccumulator) add(batch models.Batch) {
|
||||
a.rows = append(a.rows, batch.Rows...)
|
||||
a.parents = append(a.parents, models.BatchRef{Id: batch.Id})
|
||||
a.pendingDone++
|
||||
}
|
||||
|
||||
func (a *loaderAccumulator) ready() bool {
|
||||
return len(a.rows) >= a.batchSize
|
||||
}
|
||||
|
||||
func (a *loaderAccumulator) drainPending(wg *sync.WaitGroup) {
|
||||
for range a.pendingDone {
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
func sendLoadError(
|
||||
ctx context.Context,
|
||||
err error,
|
||||
retryConfig config.RetryConfig,
|
||||
failedBatchesCount *int32,
|
||||
chErrorsOut chan<- custom_errors.JobError,
|
||||
) bool {
|
||||
atomic.AddInt32(failedBatchesCount, 1)
|
||||
|
||||
var jobErr custom_errors.JobError
|
||||
if je, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
jobErr = *je
|
||||
} else {
|
||||
jobErr = custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case chErrorsOut <- jobErr:
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (gl *GenericLoader) Consume(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
@@ -29,58 +85,29 @@ func (gl *GenericLoader) Consume(
|
||||
return col.Name()
|
||||
})
|
||||
|
||||
var accRows []models.UnknownRowValues
|
||||
var parentBatches []models.BatchRef
|
||||
pendingDone := 0
|
||||
|
||||
defer func() {
|
||||
for range pendingDone {
|
||||
wgActiveBatches.Done()
|
||||
}
|
||||
}()
|
||||
acc := &loaderAccumulator{batchSize: batchSize}
|
||||
defer acc.drainPending(wgActiveBatches)
|
||||
|
||||
flush := func() bool {
|
||||
if len(accRows) == 0 {
|
||||
if len(acc.rows) == 0 {
|
||||
return true
|
||||
}
|
||||
count := len(parentBatches)
|
||||
count := len(acc.parents)
|
||||
superBatch := models.Batch{
|
||||
Id: uuid.New(),
|
||||
ParentBatches: parentBatches,
|
||||
Rows: accRows,
|
||||
ParentBatches: acc.parents,
|
||||
Rows: acc.rows,
|
||||
}
|
||||
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, superBatch)
|
||||
for range count {
|
||||
wgActiveBatches.Done()
|
||||
}
|
||||
pendingDone -= count
|
||||
accRows = nil
|
||||
parentBatches = nil
|
||||
acc.pendingDone -= count
|
||||
acc.rows = nil
|
||||
acc.parents = nil
|
||||
|
||||
if err != nil {
|
||||
atomic.AddInt32(failedBatchesCount, 1)
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case chErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||
}
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
return sendLoadError(ctx, err, retryConfig, failedBatchesCount, chErrorsOut)
|
||||
}
|
||||
|
||||
current := atomic.LoadInt64(rowsLoaded)
|
||||
@@ -90,13 +117,10 @@ func (gl *GenericLoader) Consume(
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
flush()
|
||||
@@ -106,45 +130,20 @@ func (gl *GenericLoader) Consume(
|
||||
if batchSize <= 0 {
|
||||
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
|
||||
wgActiveBatches.Done()
|
||||
|
||||
if err != nil {
|
||||
atomic.AddInt32(failedBatchesCount, 1)
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||
}
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
|
||||
return
|
||||
}
|
||||
if !sendLoadError(ctx, err, retryConfig, failedBatchesCount, chErrorsOut) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
current := atomic.LoadInt64(rowsLoaded)
|
||||
logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
|
||||
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||
continue
|
||||
}
|
||||
|
||||
pendingDone++
|
||||
accRows = append(accRows, batch.Rows...)
|
||||
parentBatches = append(parentBatches, models.BatchRef{Id: batch.Id})
|
||||
|
||||
if len(accRows) >= batchSize {
|
||||
acc.add(batch)
|
||||
if acc.ready() {
|
||||
if !flush() {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user