package extractors import ( "context" "errors" "fmt" "slices" "strings" "sync" "sync/atomic" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) func Consume( ctx context.Context, extractor etl.Extractor, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { return strings.EqualFold(col.Name(), tableInfo.PrimaryKey) }) if indexPrimaryKey == -1 { select { case <-ctx.Done(): return case chJobErrorsOut <- custom_errors.JobError{ ShouldCancelJob: true, Msg: "Primary key not found in provided columns", }: } return } for { if ctx.Err() != nil { return } select { case <-ctx.Done(): return case partition, ok := <-chPartitionsIn: if !ok { return } rowsReadResult, err := extractor.ProcessPartition( ctx, tableInfo, columns, batchSize, partition, indexPrimaryKey, chBatchesOut, ) if rowsReadResult > 0 { atomic.AddInt64(rowsRead, int64(rowsReadResult)) } if err != nil { if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok { select { case <-ctx.Done(): return case chErrorsOut <- *exError: } } else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { select { case <-ctx.Done(): return case chJobErrorsOut <- *jobError: } } else { select { case <-ctx.Done(): return case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}: } } continue } wgActivePartitions.Done() } } } func errorFromLastRow( lastRow models.UnknownRowValues, indexPrimaryKey int, partition models.Partition, previousError error, ) error { lastIdRawValue := lastRow[indexPrimaryKey] lastId, ok := convert.ToInt64(lastIdRawValue) if !ok { currentPartition := partition currentPartition.RetryCounter = 3 return &custom_errors.ExtractorError{ Partition: currentPartition, HasLastId: true, Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), } } return &custom_errors.ExtractorError{ Partition: partition, HasLastId: true, LastId: lastId, Msg: previousError.Error(), } }