diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 27e82ad..08efbf2 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -81,7 +81,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration transformStartTime := time.Now() for range maxExtractors { wgMssqlTransformers.Go(func() { - transformRowsMssql(sourceColTypes, chChunks, chChunksTransform, chJobErrors) + transformRowsMssql(ctx, sourceColTypes, chChunks, chChunksTransform, chJobErrors) }) } diff --git a/cmd/go_migrate/transformer.go b/cmd/go_migrate/transformer.go index fc39349..184e5a6 100644 --- a/cmd/go_migrate/transformer.go +++ b/cmd/go_migrate/transformer.go @@ -1,72 +1,144 @@ package main import ( + "context" + "errors" "time" log "github.com/sirupsen/logrus" ) +type transformerFunc func(any) (any, error) + +type columnTransformPlan struct { + index int + fn transformerFunc +} + func transformRowsMssql( + ctx context.Context, columns []ColumnType, chChunksIn <-chan Chunk, chChunksOut chan<- Chunk, chJobErrorsOut chan<- JobError, ) { - chunkCount := 0 - totalRowsTransformed := 0 + transformationPlan := computeTransformationPlan(columns) - for chunk := range chChunksIn { - chunkStartTime := time.Now() - log.Debugf("Chunk received, transforming %d rows...", len(chunk.Data)) + for { + if ctx.Err() != nil { + return + } - for _, rowValues := range chunk.Data { - for i, col := range columns { - value := rowValues[i] + select { + case <-ctx.Done(): + return - switch col.SystemType() { - case "uniqueidentifier": - if b, ok := value.([]byte); ok { - pgUuid, err := mssqlUuidToBigEndian(b) - if err != nil { - jobError := JobError{ - ShouldCancelJob: true, - Prev: err, - } - chJobErrorsOut <- jobError - return - } - rowValues[i] = pgUuid - } + case chunk, ok := <-chChunksIn: + if !ok { + return + } - case "geometry", "geography": - if b, ok := value.([]byte); ok { - ewkb, err := wkbToEwkbWithSrid(b, 4326) - if err != nil { - jobError := JobError{ - ShouldCancelJob: true, - Prev: err, - } - chJobErrorsOut <- jobError - return - } - rowValues[i] = ewkb - } - - case "datetime", "datetime2": - if t, ok := value.(time.Time); ok { - rowValues[i] = ensureUTC(t) - } + if len(transformationPlan) == 0 { + select { + case chChunksOut <- chunk: + continue + case <-ctx.Done(): + return } } + + chunkStartTime := time.Now() + + err := processChunk(ctx, &chunk, transformationPlan) + if err != nil { + if errors.Is(err, ctx.Err()) { + return + } + + select { + case chJobErrorsOut <- JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: + case <-ctx.Done(): + } + return + } + + log.Infof("Transformed chunk %s: %d rows in %v", chunk.Id, len(chunk.Data), time.Since(chunkStartTime)) + + select { + case chChunksOut <- chunk: + case <-ctx.Done(): + return + } + } + } +} + +func computeTransformationPlan(columns []ColumnType) []columnTransformPlan { + var plan []columnTransformPlan + + for i, col := range columns { + switch col.SystemType() { + case "uniqueidentifier": + plan = append(plan, columnTransformPlan{ + index: i, + fn: func(v any) (any, error) { + if b, ok := v.([]byte); ok && b != nil { + return mssqlUuidToBigEndian(b) + } + return v, nil + }, + }) + + case "geometry", "geography": + plan = append(plan, columnTransformPlan{ + index: i, + fn: func(v any) (any, error) { + if b, ok := v.([]byte); ok && b != nil { + return wkbToEwkbWithSrid(b, 4326) + } + return v, nil + }, + }) + + case "datetime", "datetime2": + plan = append(plan, columnTransformPlan{ + index: i, + fn: func(v any) (any, error) { + if t, ok := v.(time.Time); ok { + return ensureUTC(t), nil + } + return v, nil + }, + }) + } + } + + return plan +} + +const processChunkCtxCheck = 4096 + +func processChunk(ctx context.Context, chunk *Chunk, transformationPlan []columnTransformPlan) error { + for i, rowValues := range chunk.Data { + if i%processChunkCtxCheck == 0 { + if err := ctx.Err(); err != nil { + return err + } } - chunkCount++ - totalRowsTransformed += len(chunk.Data) - chunkDuration := time.Since(chunkStartTime) - rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds() - log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", - len(chunk.Data), chunkDuration, rowsPerSec, totalRowsTransformed) + for _, task := range transformationPlan { + val := rowValues[task.index] + if val == nil { + continue + } - chChunksOut <- chunk + transformed, err := task.fn(val) + if err != nil { + return err + } + rowValues[task.index] = transformed + } } + + return nil }