package extractors import ( "context" "fmt" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) func errorFromLastPartitionRow( lastRow models.UnknownRowValues, indexPrimaryKey int, partition models.Partition, previousError error, ) error { lastIdRawValue := lastRow[indexPrimaryKey] lastId, ok := convert.ToInt64(lastIdRawValue) if !ok { currentPartition := partition currentPartition.RetryCounter = 3 return &custom_errors.ExtractorError{ Partition: currentPartition, HasLastId: true, Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), } } return &custom_errors.ExtractorError{ Partition: partition, HasLastId: true, LastId: lastId, Msg: previousError.Error(), } } func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error { select { case chBatchesOut <- batch: return nil case <-ctx.Done(): return ctx.Err() } } func flush( ctx context.Context, partition *models.Partition, batchSize int, batchRows []models.UnknownRowValues, chBatchesOut chan<- models.Batch, ) error { if len(batchRows) == 0 { return nil } batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows} batchRows = make([]models.UnknownRowValues, 0, batchSize) return sendBatch(ctx, chBatchesOut, batch) }