120 lines
2.7 KiB
Go
120 lines
2.7 KiB
Go
package transformers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type batchAccumulator struct {
|
|
batchSize int
|
|
rows []models.UnknownRowValues
|
|
parents []models.BatchRef
|
|
}
|
|
|
|
func (a *batchAccumulator) add(batch models.Batch) {
|
|
a.rows = append(a.rows, batch.Rows...)
|
|
a.parents = append(a.parents, models.BatchRef{Id: batch.Id})
|
|
}
|
|
|
|
func (a *batchAccumulator) ready() bool {
|
|
return len(a.rows) >= a.batchSize
|
|
}
|
|
|
|
func (a *batchAccumulator) flush(ctx context.Context, chOut chan<- models.Batch, wg *sync.WaitGroup) bool {
|
|
if len(a.rows) == 0 {
|
|
return true
|
|
}
|
|
out := models.Batch{
|
|
Id: uuid.New(),
|
|
ParentBatches: a.parents,
|
|
Rows: a.rows,
|
|
}
|
|
wg.Add(1)
|
|
select {
|
|
case chOut <- out:
|
|
case <-ctx.Done():
|
|
wg.Done()
|
|
return false
|
|
}
|
|
a.rows = nil
|
|
a.parents = nil
|
|
return true
|
|
}
|
|
|
|
func sendTransformError(ctx context.Context, err error, ch chan<- custom_errors.JobError) {
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
|
return
|
|
}
|
|
var jobErr custom_errors.JobError
|
|
if je, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
|
jobErr = *je
|
|
} else {
|
|
jobErr = custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}
|
|
}
|
|
select {
|
|
case ch <- jobErr:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
|
|
func (mssqlTr *MssqlTransformer) Consume(
|
|
ctx context.Context,
|
|
columns []models.ColumnType,
|
|
retryConfig config.RetryConfig,
|
|
batchSize int,
|
|
chBatchesIn <-chan models.Batch,
|
|
chBatchesOut chan<- models.Batch,
|
|
chJobErrorsOut chan<- custom_errors.JobError,
|
|
wgActiveBatches *sync.WaitGroup,
|
|
) {
|
|
transformationPlan := computeTransformationPlan(columns)
|
|
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
|
|
transformationPlan = append(transformationPlan, storagePlan...)
|
|
|
|
acc := &batchAccumulator{batchSize: batchSize}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case batch, ok := <-chBatchesIn:
|
|
if !ok {
|
|
acc.flush(ctx, chBatchesOut, wgActiveBatches)
|
|
return
|
|
}
|
|
|
|
if len(transformationPlan) > 0 {
|
|
if err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig); err != nil {
|
|
sendTransformError(ctx, err, chJobErrorsOut)
|
|
return
|
|
}
|
|
}
|
|
|
|
if batchSize <= 0 {
|
|
wgActiveBatches.Add(1)
|
|
select {
|
|
case chBatchesOut <- batch:
|
|
case <-ctx.Done():
|
|
wgActiveBatches.Done()
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
acc.add(batch)
|
|
if acc.ready() {
|
|
if !acc.flush(ctx, chBatchesOut, wgActiveBatches) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|