package transformers import ( "context" "errors" "sync" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) func (mssqlTr *MssqlTransformer) Consume( ctx context.Context, columns []models.ColumnType, retryConfig config.RetryConfig, batchSize int, chBatchesIn <-chan models.Batch, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, ) { transformationPlan := computeTransformationPlan(columns) storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable) transformationPlan = append(transformationPlan, storagePlan...) var accRows []models.UnknownRowValues var parentBatchesId []uuid.UUID var firstPartitionId uuid.UUID flush := func() bool { if len(accRows) == 0 { return true } out := models.Batch{ Id: uuid.New(), PartitionId: firstPartitionId, ParentBatchesId: parentBatchesId, Rows: accRows, } select { case chBatchesOut <- out: wgActiveBatches.Add(1) case <-ctx.Done(): return false } accRows = nil parentBatchesId = nil firstPartitionId = uuid.Nil return true } for { if ctx.Err() != nil { return } select { case <-ctx.Done(): return case batch, ok := <-chBatchesIn: if !ok { flush() return } if len(transformationPlan) > 0 { err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig) if err != nil { if errors.Is(err, ctx.Err()) { return } if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { select { case chJobErrorsOut <- *jobError: case <-ctx.Done(): return } } else { select { case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: case <-ctx.Done(): return } } return } } if batchSize <= 0 { select { case chBatchesOut <- batch: wgActiveBatches.Add(1) case <-ctx.Done(): return } continue } if len(parentBatchesId) == 0 { firstPartitionId = batch.PartitionId } accRows = append(accRows, batch.Rows...) parentBatchesId = append(parentBatchesId, batch.Id) if len(accRows) >= batchSize { if !flush() { return } } } } }