package extractors import ( "context" "errors" "slices" "strings" "sync" "sync/atomic" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) func Consume( ctx context.Context, extractor etl.Extractor, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, chErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) }) if indexPrimaryKey == -1 { select { case <-ctx.Done(): return case chErrorsOut <- custom_errors.JobError{ ShouldCancelJob: true, Msg: "Primary key not found in provided columns", }: } return } for { if ctx.Err() != nil { return } select { case <-ctx.Done(): return case partition, ok := <-chPartitionsIn: if !ok { return } rowsReadResult, err := extractWithRetries( ctx, extractor, tableInfo, columns, batchSize, partition, indexPrimaryKey, chBatchesOut, ) wgActivePartitions.Done() if rowsReadResult > 0 { atomic.AddInt64(rowsRead, rowsReadResult) } if err != nil { var jobError *custom_errors.JobError if errors.As(err, &jobError) { select { case <-ctx.Done(): return case chErrorsOut <- *jobError: } } else { select { case <-ctx.Done(): return case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: } } } } } }