refactor: implement Consume method in GenericLoader; enhance error handling in ProcessBatch
This commit is contained in:
63
internal/app/etl/loaders/consume.go
Normal file
63
internal/app/etl/loaders/consume.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
func (gl *GenericLoader) Consume(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
columns []models.ColumnType,
|
||||
retryConfig config.RetryConfig,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
rowsLoaded *int64,
|
||||
) {
|
||||
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
||||
return col.Name()
|
||||
})
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
wgActiveBatches.Done()
|
||||
|
||||
if err != nil {
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||
}
|
||||
}
|
||||
} else {
|
||||
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user