package transformers import ( "context" "errors" "sync" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) type MssqlTransformer struct{} func NewMssqlTransformer() etl.Transformer { return &MssqlTransformer{} } func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { return []etl.ColumnTransformPlan{} } const processBatchCtxCheck = 4096 func (mssqlTr *MssqlTransformer) ProcessBatch( ctx context.Context, batch *models.Batch, transformationPlan []etl.ColumnTransformPlan, ) error { for i, rowValues := range batch.Rows { if i%processBatchCtxCheck == 0 { if err := ctx.Err(); err != nil { return err } } for _, task := range transformationPlan { val := rowValues[task.Index] if val == nil { continue } transformed, err := task.Fn(val) if err != nil { return err } rowValues[task.Index] = transformed } } return nil } func (mssqlTr *MssqlTransformer) Exec( ctx context.Context, columns []models.ColumnType, chBatchesIn <-chan models.Batch, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, ) { transformationPlan := computeTransformationPlan(columns) for { if ctx.Err() != nil { return } select { case <-ctx.Done(): return case batch, ok := <-chBatchesIn: if !ok { return } if len(transformationPlan) == 0 { select { case chBatchesOut <- batch: wgActiveBatches.Add(1) continue case <-ctx.Done(): return } } err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan) if err != nil { if errors.Is(err, ctx.Err()) { return } select { case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: case <-ctx.Done(): } return } select { case chBatchesOut <- batch: case <-ctx.Done(): return } wgActiveBatches.Add(1) } } }