11 Commits

10 changed files with 618 additions and 262 deletions

View File

@@ -1,7 +1,9 @@
package main
import (
"context"
"fmt"
"sync"
"github.com/google/uuid"
)
@@ -19,15 +21,41 @@ func (e *ExtractorError) Error() string {
const maxRetryAttempts = 3
func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chJobErrorsOut chan<- JobError) {
for err := range chErrorsIn {
func extractorErrorHandler(
ctx context.Context,
chErrorsIn <-chan ExtractorError,
chBatchesOut chan<- Batch,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.RetryCounter >= maxRetryAttempts {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err,
}
chJobErrorsOut <- jobError
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
wgActiveBatches.Done()
continue
}
@@ -41,7 +69,12 @@ func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<-
newBatch.IsLowerLimitInclusive = false
}
chBatchesOut <- newBatch
select {
case chBatchesOut <- newBatch:
case <-ctx.Done():
return
}
}
}
}
@@ -52,20 +85,18 @@ func ExtractorErrorFromLastRowMssql(lastRow UnknownRowValues, indexPrimaryKey in
if !ok {
currentBatch := *batch
currentBatch.RetryCounter = maxRetryAttempts
exError := ExtractorError{
return ExtractorError{
Batch: currentBatch,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
return exError
}
exError := ExtractorError{
return ExtractorError{
Batch: *batch,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
return exError
}

View File

@@ -3,10 +3,13 @@ package main
import (
"context"
"database/sql"
"errors"
"slices"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
@@ -14,6 +17,13 @@ import (
type UnknownRowValues = []any
type Chunk struct {
Id uuid.UUID
BatchId uuid.UUID
Data []UnknownRowValues
RetryCounter int
}
func extractFromMssql(
ctx context.Context,
db *sql.DB,
@@ -21,25 +31,62 @@ func extractFromMssql(
columns []ColumnType,
chunkSize int,
chBatchesIn <-chan Batch,
chChunksOut chan<- []UnknownRowValues,
chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
return strings.EqualFold(col.name, job.PrimaryKey)
})
if indexPrimaryKey == -1 {
exError := JobError{
jobError := JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}
chJobErrorsOut <- exError
select {
case <-ctx.Done():
return
case chJobErrorsOut <- jobError:
}
return
}
for batch := range chBatchesIn {
func() {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort {
return
}
}
}
}
func processBatch(
ctx context.Context,
db *sql.DB,
job MigrationJob,
columns []ColumnType,
chunkSize int,
batch Batch,
indexPrimaryKey int,
chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError,
wgActiveBatches *sync.WaitGroup,
) (abort bool) {
query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive)
log.Debug("Query used to extract data from mssql: ", query)
@@ -54,13 +101,12 @@ func extractFromMssql(
queryStartTime := time.Now()
rows, err := db.QueryContext(ctx, query, queryArgs...)
if err != nil {
exError := ExtractorError{
Batch: batch,
HasLastId: false,
Msg: err.Error(),
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
chErrorsOut <- exError
return
return false
}
defer rows.Close()
log.Debugf("Query executed in %v", time.Since(queryStartTime))
@@ -79,19 +125,28 @@ func extractFromMssql(
if err := rows.Scan(scanArgs...); err != nil {
if len(rowsChunk) == 0 {
exError := ExtractorError{
Batch: batch,
HasLastId: false,
Msg: err.Error(),
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
chErrorsOut <- exError
return
return false
}
lastRow := rowsChunk[len(rowsChunk)-1]
chChunksOut <- rowsChunk
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
return
select {
case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err):
case <-ctx.Done():
return true
}
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
return false
}
rowsChunk = append(rowsChunk, values)
@@ -100,39 +155,55 @@ func extractFromMssql(
if len(rowsChunk) >= chunkSize {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(chunkSize) / chunkDuration.Seconds()
log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
chChunksOut <- rowsChunk
log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
chunkStartTime = time.Now()
}
}
if len(rowsChunk) > 0 {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
chChunksOut <- rowsChunk
if err := rows.Err(); err != nil {
if errors.Is(err, ctx.Err()) {
return true
}
if err := rows.Err(); err != nil {
if len(rowsChunk) == 0 {
exError := ExtractorError{
Batch: batch,
HasLastId: false,
Msg: err.Error(),
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
chErrorsOut <- exError
return
return false
}
lastRow := rowsChunk[len(rowsChunk)-1]
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
return
select {
case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err):
case <-ctx.Done():
return true
}
}()
return false
}
if len(rowsChunk) > 0 {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
}
wgActiveBatches.Done()
return false
}
func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error {

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
@@ -20,14 +21,27 @@ func (e *JobError) Error() string {
return e.Msg
}
func jobErrorHandler(chErrorsIn <-chan JobError) error {
for err := range chErrorsIn {
func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
for {
if ctx.Err() != nil {
return nil
}
select {
case <-ctx.Done():
return nil
case err, ok := <-chErrorsIn:
if !ok {
return nil
}
if err.ShouldCancelJob {
log.Error(err.Msg, " - ", err.Prev)
return &err
}
log.Error(err)
log.Error(err.Msg, " - ", err.Prev)
}
}
return nil
}

View File

@@ -0,0 +1,65 @@
package main
import (
"context"
"fmt"
"sync"
)
type LoaderError struct {
Chunk
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}
func loaderErrorHandler(
ctx context.Context,
chErrorsIn <-chan LoaderError,
chChunksOut chan<- Chunk,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.RetryCounter >= maxRetryAttempts {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
wgActiveChunks.Done()
continue
}
err.RetryCounter++
select {
case chChunksOut <- err.Chunk:
case <-ctx.Done():
return
}
}
}
}

View File

@@ -3,61 +3,103 @@ package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
mssql "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
for rows := range in {
log.Debugf("Chunk received, loading data into...")
for i, rowValues := range rows {
if i%100 == 0 {
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
}
}
}
}
func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error {
chunkCount := 0
totalRowsLoaded := 0
for rows := range in {
chunkStartTime := time.Now()
identifier := pgx.Identifier{job.Schema, job.Table}
func loadRowsPostgres(
ctx context.Context,
db *pgxpool.Pool,
job MigrationJob,
columns []ColumnType,
chChunksIn <-chan Chunk,
chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) {
tableId := pgx.Identifier{job.Schema, job.Table}
colNames := Map(columns, func(col ColumnType) string {
return col.name
})
copyStartTime := time.Now()
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case chunk, ok := <-chChunksIn:
if !ok {
return
}
if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks); abort {
return
}
}
}
}
func loadChunkPostgres(
ctx context.Context,
db *pgxpool.Pool,
identifier pgx.Identifier,
colNames []string,
chunk Chunk,
chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) (abort bool) {
chunkStartTime := time.Now()
_, err := db.CopyFrom(
ctx,
identifier,
colNames,
pgx.CopyFromRows(rows),
pgx.CopyFromRows(chunk.Data),
)
if err != nil {
return err
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == "23505" {
select {
case chJobErrorsOut <- JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal data integrity error in table %s", identifier.Sanitize()),
Prev: err,
}:
case <-ctx.Done():
}
wgActiveChunks.Done()
return true
}
}
select {
case chErrorsOut <- LoaderError{Chunk: chunk, Msg: err.Error()}:
case <-ctx.Done():
return true
}
return false
}
chunkCount++
totalRowsLoaded += len(rows)
copyDuration := time.Since(copyStartTime)
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds()
log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
}
log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec)
return nil
wgActiveChunks.Done()
return false
}
func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error {
@@ -134,3 +176,16 @@ func Map[T any, V any](input []T, mapper func(T) V) []V {
return result
}
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
for rows := range in {
log.Debugf("Chunk received, loading data into...")
for i, rowValues := range rows {
if i%100 == 0 {
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
}
}
}
}

View File

@@ -13,5 +13,5 @@ func configureLog() {
DisableSorting: false,
PadLevelText: true,
})
log.SetLevel(log.DebugLevel)
log.SetLevel(log.InfoLevel)
}

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"time"
log "github.com/sirupsen/logrus"
@@ -37,6 +38,10 @@ const (
func main() {
configureLog()
startTime := time.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Info("=== Starting migration ===")
log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize)
@@ -50,7 +55,7 @@ func main() {
for _, job := range migrationJobs {
log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table)
processMigrationJob(sourceDb, targetDb, job)
processMigrationJob(ctx, sourceDb, targetDb, job)
}
totalDuration := time.Since(startTime)

View File

@@ -2,27 +2,29 @@ package main
import (
"encoding/binary"
"errors"
"time"
)
func mssqlUuidToBigEndian(mssqlUuid []byte) []byte {
func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
if len(mssqlUuid) != 16 {
return mssqlUuid
return nil, errors.New("Invalid uuid")
}
pgUuid := make([]byte, 16)
pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0]
pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4]
pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6]
copy(pgUuid[8:], mssqlUuid[8:])
return pgUuid
return pgUuid, nil
}
const sridFlag = 0x20000000
func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
func wkbToEwkbWithSrid(geometry []byte, srid int) ([]byte, error) {
if len(geometry) < 5 {
return geometry
return nil, errors.New("Invalid wkb")
}
var byteOrder binary.ByteOrder
@@ -34,7 +36,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
wkbType := byteOrder.Uint32(geometry[1:5])
if wkbType&sridFlag != 0 {
return geometry
return geometry, nil
}
ewkbType := wkbType | sridFlag
@@ -49,7 +51,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
copy(result[9:], geometry[5:])
return result
return result, nil
}
func ensureUTC(t time.Time) time.Time {
@@ -59,3 +61,20 @@ func ensureUTC(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
}
}

View File

@@ -12,7 +12,12 @@ import (
log "github.com/sirupsen/logrus"
)
func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job MigrationJob) {
func processMigrationJob(
ctx context.Context,
sourceDb *sql.DB,
targetDb *pgxpool.Pool,
job MigrationJob,
) {
jobStartTime := time.Now()
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
@@ -24,91 +29,95 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types")
ctx, cancel := context.WithCancel(context.Background())
jobCtx, cancel := context.WithCancel(ctx)
defer cancel()
batches, err := batchGeneratorMssql(ctx, sourceDb, job)
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job)
if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err)
}
chJobErrors := make(chan JobError)
defer close(chJobErrors)
chJobErrors := make(chan JobError, 50)
chBatches := make(chan Batch, QueueSize)
chExtractorErrors := make(chan ExtractorError, QueueSize)
chChunksRaw := make(chan Chunk, QueueSize)
chChunksTransformed := make(chan Chunk, QueueSize)
chLoadersErrors := make(chan LoaderError, QueueSize)
var wgActiveBatches sync.WaitGroup
var wgActiveChunks sync.WaitGroup
var wgExtractors sync.WaitGroup
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
go func() {
if err := jobErrorHandler(chJobErrors); err != nil {
if err := jobErrorHandler(jobCtx, chJobErrors); err != nil {
cancel()
}
}()
chBatches := make(chan Batch, len(batches))
chExtractorErrors := make(chan ExtractorError, len(batches))
go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
go func() {
extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors)
}()
chChunks := make(chan []UnknownRowValues, QueueSize)
maxExtractors := min(NumExtractors, len(batches))
var wgMssqlExtractors sync.WaitGroup
log.Infof("Starting %d MSSQL extractors...", maxExtractors)
log.Infof("Starting %d extractors...", maxExtractors)
extractStartTime := time.Now()
for range maxExtractors {
wgMssqlExtractors.Go(func() {
extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors, chJobErrors)
wgExtractors.Go(func() {
extractFromMssql(jobCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches)
})
}
wgActiveBatches.Add(len(batches))
go func() {
for _, br := range batches {
chBatches <- br
for _, batch := range batches {
chBatches <- batch
}
close(chBatches)
close(chExtractorErrors)
}()
go func() {
wgMssqlExtractors.Wait()
close(chChunks)
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
}()
chRowsTransform := make(chan []UnknownRowValues, QueueSize)
var wgMssqlTransformers sync.WaitGroup
log.Infof("Starting %d MSSQL transformers...", maxExtractors)
log.Infof("Starting %d transformers...", maxExtractors)
transformStartTime := time.Now()
for range maxExtractors {
wgMssqlTransformers.Go(func() {
transformRowsMssql(sourceColTypes, chChunks, chRowsTransform)
wgTransformers.Go(func() {
transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks)
})
}
go func() {
wgMssqlTransformers.Wait()
close(chRowsTransform)
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
}()
var wgPostgresLoaders sync.WaitGroup
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
loaderStartTime := time.Now()
loadStartTime := time.Now()
for range NumLoaders {
wgPostgresLoaders.Go(func() {
if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chRowsTransform); err != nil {
log.Error("Unexpected error loading data into postgres: ", err)
}
wgLoaders.Go(func() {
loadRowsPostgres(jobCtx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
})
}
wgPostgresLoaders.Wait()
log.Infof("Loading completed in %v", time.Since(loaderStartTime))
go func() {
wgActiveBatches.Wait()
close(chBatches)
close(chExtractorErrors)
totalDuration := time.Since(jobStartTime)
log.Infof("Migration job completed successfully! Total time: %v", totalDuration)
wgExtractors.Wait()
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
close(chChunksRaw)
wgTransformers.Wait()
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
wgActiveChunks.Wait()
close(chChunksTransformed)
close(chLoadersErrors)
wgLoaders.Wait()
log.Infof("Loading completed in %v", time.Since(loadStartTime))
cancel()
}()
<-jobCtx.Done()
log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime))
}
func logColumnTypes(columnTypes []ColumnType, label string) {

View File

@@ -1,62 +1,149 @@
package main
import (
"context"
"errors"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) {
chunkCount := 0
totalRowsTransformed := 0
type transformerFunc func(any) (any, error)
type columnTransformPlan struct {
index int
fn transformerFunc
}
func transformRowsMssql(
ctx context.Context,
columns []ColumnType,
chChunksIn <-chan Chunk,
chChunksOut chan<- Chunk,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case chunk, ok := <-chChunksIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chChunksOut <- chunk:
wgActiveChunks.Add(1)
continue
case <-ctx.Done():
return
}
}
for rows := range in {
chunkStartTime := time.Now()
log.Debugf("Chunk #%d received, transforming %d rows...", chunkCount+1, len(rows))
for _, rowValues := range rows {
err := processChunk(ctx, &chunk, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
log.Infof("Transformed chunk %s: %d rows in %v", chunk.Id, len(chunk.Data), time.Since(chunkStartTime))
select {
case chChunksOut <- chunk:
case <-ctx.Done():
return
}
wgActiveChunks.Add(1)
}
}
}
func computeTransformationPlan(columns []ColumnType) []columnTransformPlan {
var plan []columnTransformPlan
for i, col := range columns {
value := rowValues[i]
if col.SystemType() == "uniqueidentifier" {
if b, ok := value.([]byte); ok {
rowValues[i] = mssqlUuidToBigEndian(b)
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
} else if col.SystemType() == "geometry" || col.SystemType() == "geography" {
if b, ok := value.([]byte); ok {
rowValues[i] = wkbToEwkbWithSrid(b, 4326)
}
} else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" {
if t, ok := value.(time.Time); ok {
rowValues[i] = ensureUTC(t)
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
chunkCount++
totalRowsTransformed += len(rows)
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
log.Infof("Transformed chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows",
chunkCount, len(rows), chunkDuration, rowsPerSec, totalRowsTransformed)
return plan
}
out <- rows
const processChunkCtxCheck = 4096
func processChunk(ctx context.Context, chunk *Chunk, transformationPlan []columnTransformPlan) error {
for i, rowValues := range chunk.Data {
if i%processChunkCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
for _, task := range transformationPlan {
val := rowValues[task.index]
if val == nil {
continue
}
transformed, err := task.fn(val)
if err != nil {
return err
}
rowValues[task.index] = transformed
}
}
return nil
}