package transformers import ( "context" "errors" "sync" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) type batchAccumulator struct { batchSize int rows []models.UnknownRowValues parents []models.BatchRef } func (a *batchAccumulator) add(batch models.Batch) { a.rows = append(a.rows, batch.Rows...) a.parents = append(a.parents, models.BatchRef{Id: batch.Id}) } func (a *batchAccumulator) ready() bool { return len(a.rows) >= a.batchSize } func (a *batchAccumulator) flush(ctx context.Context, chOut chan<- models.Batch, wg *sync.WaitGroup) bool { if len(a.rows) == 0 { return true } out := models.Batch{ Id: uuid.New(), ParentBatches: a.parents, Rows: a.rows, } wg.Add(1) select { case chOut <- out: case <-ctx.Done(): wg.Done() return false } a.rows = nil a.parents = nil return true } func sendTransformError(ctx context.Context, err error, ch chan<- custom_errors.JobError) { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } var jobErr custom_errors.JobError if je, ok := errors.AsType[*custom_errors.JobError](err); ok { jobErr = *je } else { jobErr = custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err} } select { case ch <- jobErr: case <-ctx.Done(): } } func (mssqlTr *MssqlTransformer) Consume( ctx context.Context, columns []models.ColumnType, retryConfig config.RetryConfig, batchSize int, chBatchesIn <-chan models.Batch, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, ) { transformationPlan := computeTransformationPlan(columns) storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable) transformationPlan = append(transformationPlan, storagePlan...) acc := &batchAccumulator{batchSize: batchSize} for { select { case <-ctx.Done(): return case batch, ok := <-chBatchesIn: if !ok { acc.flush(ctx, chBatchesOut, wgActiveBatches) return } if len(transformationPlan) > 0 { if err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig); err != nil { sendTransformError(ctx, err, chJobErrorsOut) return } } if batchSize <= 0 { wgActiveBatches.Add(1) select { case chBatchesOut <- batch: case <-ctx.Done(): wgActiveBatches.Done() return } continue } acc.add(batch) if acc.ready() { if !acc.flush(ctx, chBatchesOut, wgActiveBatches) { return } } } } }