Files
go-migrate/internal/app/etl/transformers/consume.go

120 lines
2.7 KiB
Go

package transformers
import (
"context"
"errors"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type batchAccumulator struct {
batchSize int
rows []models.UnknownRowValues
parents []models.BatchRef
}
func (a *batchAccumulator) add(batch models.Batch) {
a.rows = append(a.rows, batch.Rows...)
a.parents = append(a.parents, models.BatchRef{Id: batch.Id})
}
func (a *batchAccumulator) ready() bool {
return len(a.rows) >= a.batchSize
}
func (a *batchAccumulator) flush(ctx context.Context, chOut chan<- models.Batch, wg *sync.WaitGroup) bool {
if len(a.rows) == 0 {
return true
}
out := models.Batch{
Id: uuid.New(),
ParentBatches: a.parents,
Rows: a.rows,
}
wg.Add(1)
select {
case chOut <- out:
case <-ctx.Done():
wg.Done()
return false
}
a.rows = nil
a.parents = nil
return true
}
func sendTransformError(ctx context.Context, err error, ch chan<- custom_errors.JobError) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
var jobErr custom_errors.JobError
if je, ok := errors.AsType[*custom_errors.JobError](err); ok {
jobErr = *je
} else {
jobErr = custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}
}
select {
case ch <- jobErr:
case <-ctx.Done():
}
}
func (mssqlTr *MssqlTransformer) Consume(
ctx context.Context,
columns []models.ColumnType,
retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...)
acc := &batchAccumulator{batchSize: batchSize}
for {
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
acc.flush(ctx, chBatchesOut, wgActiveBatches)
return
}
if len(transformationPlan) > 0 {
if err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig); err != nil {
sendTransformError(ctx, err, chJobErrorsOut)
return
}
}
if batchSize <= 0 {
wgActiveBatches.Add(1)
select {
case chBatchesOut <- batch:
case <-ctx.Done():
wgActiveBatches.Done()
return
}
continue
}
acc.add(batch)
if acc.ready() {
if !acc.flush(ctx, chBatchesOut, wgActiveBatches) {
return
}
}
}
}
}