78 lines
1.7 KiB
Go
78 lines
1.7 KiB
Go
package loaders
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
)
|
|
|
|
func (gl *GenericLoader) Consume(
|
|
ctx context.Context,
|
|
tableInfo config.TargetTableInfo,
|
|
columns []models.ColumnType,
|
|
retryConfig config.RetryConfig,
|
|
chBatchesIn <-chan models.Batch,
|
|
chErrorsOut chan<- custom_errors.JobError,
|
|
wgActiveBatches *sync.WaitGroup,
|
|
rowsLoaded *int64,
|
|
failedBatchesCount *int32,
|
|
) {
|
|
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
|
return col.Name()
|
|
})
|
|
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case batch, ok := <-chBatchesIn:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
|
|
wgActiveBatches.Done()
|
|
|
|
if err != nil {
|
|
atomic.AddInt32(failedBatchesCount, 1)
|
|
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case chErrorsOut <- *jobError:
|
|
}
|
|
} else {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
|
}
|
|
}
|
|
|
|
currentFBCount := atomic.LoadInt32(failedBatchesCount)
|
|
if currentFBCount > int32(retryConfig.MaxFailedBatchesLoad) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
|
|
return
|
|
}
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
|
}
|
|
}
|
|
}
|