Files

109 lines
2.5 KiB
Go

package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/sirupsen/logrus"
)
func (ex *GenericExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
retryConfig config.RetryConfig,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
failedPartitionsCount *int32,
fromJsonColumns []config.FromJsonItem,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := ex.ProcessPartitionWithRetries(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
retryConfig,
chBatchesOut,
fromJsonColumns,
)
wgActivePartitions.Done()
if rowsReadResult > 0 {
current := atomic.LoadInt64(rowsRead)
logrus.Debugf("Rows read (partition extracted): +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
atomic.AddInt32(failedPartitionsCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
currentFPCount := atomic.LoadInt32(failedPartitionsCount)
if currentFPCount > int32(retryConfig.MaxFailedPartitions) {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed partitions reached"}:
return
}
}
}
}
}
}