11 Commits

10 changed files with 618 additions and 262 deletions

View File

@@ -1,7 +1,9 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"sync"
"github.com/google/uuid" "github.com/google/uuid"
) )
@@ -19,29 +21,60 @@ func (e *ExtractorError) Error() string {
const maxRetryAttempts = 3 const maxRetryAttempts = 3
func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chJobErrorsOut chan<- JobError) { func extractorErrorHandler(
for err := range chErrorsIn { ctx context.Context,
if err.RetryCounter >= maxRetryAttempts { chErrorsIn <-chan ExtractorError,
jobError := JobError{ chBatchesOut chan<- Batch,
ShouldCancelJob: false, chJobErrorsOut chan<- JobError,
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts), wgActiveBatches *sync.WaitGroup,
Prev: &err, ) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.RetryCounter >= maxRetryAttempts {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
wgActiveBatches.Done()
continue
}
newBatch := err.Batch
newBatch.RetryCounter++
if err.HasLastId {
newBatch.ParentId = err.Id
newBatch.Id = uuid.New()
newBatch.LowerLimit = err.LastId
newBatch.IsLowerLimitInclusive = false
}
select {
case chBatchesOut <- newBatch:
case <-ctx.Done():
return
} }
chJobErrorsOut <- jobError
continue
} }
newBatch := err.Batch
newBatch.RetryCounter++
if err.HasLastId {
newBatch.ParentId = err.Id
newBatch.Id = uuid.New()
newBatch.LowerLimit = err.LastId
newBatch.IsLowerLimitInclusive = false
}
chBatchesOut <- newBatch
} }
} }
@@ -52,20 +85,18 @@ func ExtractorErrorFromLastRowMssql(lastRow UnknownRowValues, indexPrimaryKey in
if !ok { if !ok {
currentBatch := *batch currentBatch := *batch
currentBatch.RetryCounter = maxRetryAttempts currentBatch.RetryCounter = maxRetryAttempts
exError := ExtractorError{ return ExtractorError{
Batch: currentBatch, Batch: currentBatch,
HasLastId: true, HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
} }
return exError
} }
exError := ExtractorError{ return ExtractorError{
Batch: *batch, Batch: *batch,
HasLastId: true, HasLastId: true,
LastId: lastId, LastId: lastId,
Msg: previousError.Error(), Msg: previousError.Error(),
} }
return exError
} }

View File

@@ -3,10 +3,13 @@ package main
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"slices" "slices"
"strings" "strings"
"sync"
"time" "time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb" _ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -14,6 +17,13 @@ import (
type UnknownRowValues = []any type UnknownRowValues = []any
type Chunk struct {
Id uuid.UUID
BatchId uuid.UUID
Data []UnknownRowValues
RetryCounter int
}
func extractFromMssql( func extractFromMssql(
ctx context.Context, ctx context.Context,
db *sql.DB, db *sql.DB,
@@ -21,120 +31,181 @@ func extractFromMssql(
columns []ColumnType, columns []ColumnType,
chunkSize int, chunkSize int,
chBatchesIn <-chan Batch, chBatchesIn <-chan Batch,
chChunksOut chan<- []UnknownRowValues, chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError, chErrorsOut chan<- ExtractorError,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) { ) {
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool { indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
return strings.EqualFold(col.name, job.PrimaryKey) return strings.EqualFold(col.name, job.PrimaryKey)
}) })
if indexPrimaryKey == -1 { if indexPrimaryKey == -1 {
exError := JobError{ jobError := JobError{
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: "Primary key not found in provided columns", Msg: "Primary key not found in provided columns",
} }
chJobErrorsOut <- exError
select {
case <-ctx.Done():
return
case chJobErrorsOut <- jobError:
}
return return
} }
for batch := range chBatchesIn { for {
func() { if ctx.Err() != nil {
query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) return
log.Debug("Query used to extract data from mssql: ", query) }
var queryArgs []any select {
if batch.ShouldUseRange { case <-ctx.Done():
queryArgs = append(queryArgs, return
sql.Named("min", batch.LowerLimit), case batch, ok := <-chBatchesIn:
sql.Named("max", batch.UpperLimit), if !ok {
)
}
queryStartTime := time.Now()
rows, err := db.QueryContext(ctx, query, queryArgs...)
if err != nil {
exError := ExtractorError{
Batch: batch,
HasLastId: false,
Msg: err.Error(),
}
chErrorsOut <- exError
return return
} }
defer rows.Close()
log.Debugf("Query executed in %v", time.Since(queryStartTime))
rowsChunk := make([]UnknownRowValues, 0, chunkSize) if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort {
totalRowsExtracted := 0
chunkStartTime := time.Now()
for rows.Next() {
values := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(rowsChunk) == 0 {
exError := ExtractorError{
Batch: batch,
HasLastId: false,
Msg: err.Error(),
}
chErrorsOut <- exError
return
}
lastRow := rowsChunk[len(rowsChunk)-1]
chChunksOut <- rowsChunk
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
return
}
rowsChunk = append(rowsChunk, values)
totalRowsExtracted++
if len(rowsChunk) >= chunkSize {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(chunkSize) / chunkDuration.Seconds()
log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
chChunksOut <- rowsChunk
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
chunkStartTime = time.Now()
}
}
if len(rowsChunk) > 0 {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
chChunksOut <- rowsChunk
}
if err := rows.Err(); err != nil {
if len(rowsChunk) == 0 {
exError := ExtractorError{
Batch: batch,
HasLastId: false,
Msg: err.Error(),
}
chErrorsOut <- exError
return
}
lastRow := rowsChunk[len(rowsChunk)-1]
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
return return
} }
}() }
} }
} }
func processBatch(
ctx context.Context,
db *sql.DB,
job MigrationJob,
columns []ColumnType,
chunkSize int,
batch Batch,
indexPrimaryKey int,
chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError,
wgActiveBatches *sync.WaitGroup,
) (abort bool) {
query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive)
log.Debug("Query used to extract data from mssql: ", query)
var queryArgs []any
if batch.ShouldUseRange {
queryArgs = append(queryArgs,
sql.Named("min", batch.LowerLimit),
sql.Named("max", batch.UpperLimit),
)
}
queryStartTime := time.Now()
rows, err := db.QueryContext(ctx, query, queryArgs...)
if err != nil {
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
return false
}
defer rows.Close()
log.Debugf("Query executed in %v", time.Since(queryStartTime))
rowsChunk := make([]UnknownRowValues, 0, chunkSize)
totalRowsExtracted := 0
chunkStartTime := time.Now()
for rows.Next() {
values := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(rowsChunk) == 0 {
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
return false
}
lastRow := rowsChunk[len(rowsChunk)-1]
select {
case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err):
case <-ctx.Done():
return true
}
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
return false
}
rowsChunk = append(rowsChunk, values)
totalRowsExtracted++
if len(rowsChunk) >= chunkSize {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(chunkSize) / chunkDuration.Seconds()
log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
chunkStartTime = time.Now()
}
}
if err := rows.Err(); err != nil {
if errors.Is(err, ctx.Err()) {
return true
}
if len(rowsChunk) == 0 {
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
return false
}
lastRow := rowsChunk[len(rowsChunk)-1]
select {
case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err):
case <-ctx.Done():
return true
}
return false
}
if len(rowsChunk) > 0 {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
}
wgActiveBatches.Done()
return false
}
func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error { func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error {
query := buildExtractQueryPostgres(job, columns) query := buildExtractQueryPostgres(job, columns)
log.Debug("Query used to extract data from postgres: ", query) log.Debug("Query used to extract data from postgres: ", query)

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -16,18 +17,31 @@ func (e *JobError) Error() string {
if e.Prev != nil { if e.Prev != nil {
return fmt.Sprintf("%s: %v", e.Msg, e.Prev) return fmt.Sprintf("%s: %v", e.Msg, e.Prev)
} }
return e.Msg return e.Msg
} }
func jobErrorHandler(chErrorsIn <-chan JobError) error { func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
for err := range chErrorsIn { for {
if err.ShouldCancelJob { if ctx.Err() != nil {
return &err return nil
} }
log.Error(err) select {
} case <-ctx.Done():
return nil
return nil case err, ok := <-chErrorsIn:
if !ok {
return nil
}
if err.ShouldCancelJob {
log.Error(err.Msg, " - ", err.Prev)
return &err
}
log.Error(err.Msg, " - ", err.Prev)
}
}
} }

View File

@@ -0,0 +1,65 @@
package main
import (
"context"
"fmt"
"sync"
)
type LoaderError struct {
Chunk
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}
func loaderErrorHandler(
ctx context.Context,
chErrorsIn <-chan LoaderError,
chChunksOut chan<- Chunk,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.RetryCounter >= maxRetryAttempts {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
wgActiveChunks.Done()
continue
}
err.RetryCounter++
select {
case chChunksOut <- err.Chunk:
case <-ctx.Done():
return
}
}
}
}

View File

@@ -3,61 +3,103 @@ package main
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
mssql "github.com/microsoft/go-mssqldb" mssql "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) { func loadRowsPostgres(
ctx context.Context,
db *pgxpool.Pool,
job MigrationJob,
columns []ColumnType,
chChunksIn <-chan Chunk,
chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) {
tableId := pgx.Identifier{job.Schema, job.Table}
colNames := Map(columns, func(col ColumnType) string {
return col.name
})
for rows := range in { for {
log.Debugf("Chunk received, loading data into...") if ctx.Err() != nil {
return
}
for i, rowValues := range rows { select {
if i%100 == 0 { case <-ctx.Done():
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) return
case chunk, ok := <-chChunksIn:
if !ok {
return
}
if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks); abort {
return
} }
} }
} }
} }
func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error { func loadChunkPostgres(
chunkCount := 0 ctx context.Context,
totalRowsLoaded := 0 db *pgxpool.Pool,
identifier pgx.Identifier,
colNames []string,
chunk Chunk,
chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) (abort bool) {
chunkStartTime := time.Now()
_, err := db.CopyFrom(
ctx,
identifier,
colNames,
pgx.CopyFromRows(chunk.Data),
)
for rows := range in { if err != nil {
chunkStartTime := time.Now() var pgErr *pgconn.PgError
identifier := pgx.Identifier{job.Schema, job.Table} if errors.As(err, &pgErr) {
colNames := Map(columns, func(col ColumnType) string { if pgErr.Code == "23505" {
return col.name select {
}) case chJobErrorsOut <- JobError{
ShouldCancelJob: true,
copyStartTime := time.Now() Msg: fmt.Sprintf("Fatal data integrity error in table %s", identifier.Sanitize()),
_, err := db.CopyFrom( Prev: err,
ctx, }:
identifier, case <-ctx.Done():
colNames, }
pgx.CopyFromRows(rows), wgActiveChunks.Done()
) return true
}
if err != nil {
return err
} }
chunkCount++ select {
totalRowsLoaded += len(rows) case chErrorsOut <- LoaderError{Chunk: chunk, Msg: err.Error()}:
copyDuration := time.Since(copyStartTime) case <-ctx.Done():
chunkDuration := time.Since(chunkStartTime) return true
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() }
return false
log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
} }
return nil chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds()
log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec)
wgActiveChunks.Done()
return false
} }
func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error { func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error {
@@ -134,3 +176,16 @@ func Map[T any, V any](input []T, mapper func(T) V) []V {
return result return result
} }
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
for rows := range in {
log.Debugf("Chunk received, loading data into...")
for i, rowValues := range rows {
if i%100 == 0 {
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
}
}
}
}

View File

@@ -13,5 +13,5 @@ func configureLog() {
DisableSorting: false, DisableSorting: false,
PadLevelText: true, PadLevelText: true,
}) })
log.SetLevel(log.DebugLevel) log.SetLevel(log.InfoLevel)
} }

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -37,6 +38,10 @@ const (
func main() { func main() {
configureLog() configureLog()
startTime := time.Now() startTime := time.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Info("=== Starting migration ===") log.Info("=== Starting migration ===")
log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize) log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize)
@@ -50,7 +55,7 @@ func main() {
for _, job := range migrationJobs { for _, job := range migrationJobs {
log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table) log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table)
processMigrationJob(sourceDb, targetDb, job) processMigrationJob(ctx, sourceDb, targetDb, job)
} }
totalDuration := time.Since(startTime) totalDuration := time.Since(startTime)

View File

@@ -2,27 +2,29 @@ package main
import ( import (
"encoding/binary" "encoding/binary"
"errors"
"time" "time"
) )
func mssqlUuidToBigEndian(mssqlUuid []byte) []byte { func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
if len(mssqlUuid) != 16 { if len(mssqlUuid) != 16 {
return mssqlUuid return nil, errors.New("Invalid uuid")
} }
pgUuid := make([]byte, 16) pgUuid := make([]byte, 16)
pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0] pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0]
pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4] pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4]
pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6] pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6]
copy(pgUuid[8:], mssqlUuid[8:]) copy(pgUuid[8:], mssqlUuid[8:])
return pgUuid return pgUuid, nil
} }
const sridFlag = 0x20000000 const sridFlag = 0x20000000
func wkbToEwkbWithSrid(geometry []byte, srid int) []byte { func wkbToEwkbWithSrid(geometry []byte, srid int) ([]byte, error) {
if len(geometry) < 5 { if len(geometry) < 5 {
return geometry return nil, errors.New("Invalid wkb")
} }
var byteOrder binary.ByteOrder var byteOrder binary.ByteOrder
@@ -34,7 +36,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
wkbType := byteOrder.Uint32(geometry[1:5]) wkbType := byteOrder.Uint32(geometry[1:5])
if wkbType&sridFlag != 0 { if wkbType&sridFlag != 0 {
return geometry return geometry, nil
} }
ewkbType := wkbType | sridFlag ewkbType := wkbType | sridFlag
@@ -49,7 +51,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
copy(result[9:], geometry[5:]) copy(result[9:], geometry[5:])
return result return result, nil
} }
func ensureUTC(t time.Time) time.Time { func ensureUTC(t time.Time) time.Time {
@@ -59,3 +61,20 @@ func ensureUTC(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
} }
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
}
}

View File

@@ -12,7 +12,12 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job MigrationJob) { func processMigrationJob(
ctx context.Context,
sourceDb *sql.DB,
targetDb *pgxpool.Pool,
job MigrationJob,
) {
jobStartTime := time.Now() jobStartTime := time.Now()
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey) log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
@@ -24,91 +29,95 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
logColumnTypes(sourceColTypes, "Source col types") logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types") logColumnTypes(targetColTypes, "Target col types")
ctx, cancel := context.WithCancel(context.Background()) jobCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
batches, err := batchGeneratorMssql(ctx, sourceDb, job) batches, err := batchGeneratorMssql(jobCtx, sourceDb, job)
if err != nil { if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
chJobErrors := make(chan JobError) chJobErrors := make(chan JobError, 50)
defer close(chJobErrors) chBatches := make(chan Batch, QueueSize)
chExtractorErrors := make(chan ExtractorError, QueueSize)
chChunksRaw := make(chan Chunk, QueueSize)
chChunksTransformed := make(chan Chunk, QueueSize)
chLoadersErrors := make(chan LoaderError, QueueSize)
var wgActiveBatches sync.WaitGroup
var wgActiveChunks sync.WaitGroup
var wgExtractors sync.WaitGroup
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
go func() { go func() {
if err := jobErrorHandler(chJobErrors); err != nil { if err := jobErrorHandler(jobCtx, chJobErrors); err != nil {
cancel() cancel()
} }
}() }()
chBatches := make(chan Batch, len(batches)) go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
chExtractorErrors := make(chan ExtractorError, len(batches)) go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
go func() {
extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors)
}()
chChunks := make(chan []UnknownRowValues, QueueSize)
maxExtractors := min(NumExtractors, len(batches)) maxExtractors := min(NumExtractors, len(batches))
var wgMssqlExtractors sync.WaitGroup log.Infof("Starting %d extractors...", maxExtractors)
log.Infof("Starting %d MSSQL extractors...", maxExtractors)
extractStartTime := time.Now() extractStartTime := time.Now()
for range maxExtractors { for range maxExtractors {
wgMssqlExtractors.Go(func() { wgExtractors.Go(func() {
extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors, chJobErrors) extractFromMssql(jobCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches)
}) })
} }
wgActiveBatches.Add(len(batches))
go func() { go func() {
for _, br := range batches { for _, batch := range batches {
chBatches <- br chBatches <- batch
} }
close(chBatches)
close(chExtractorErrors)
}() }()
go func() { log.Infof("Starting %d transformers...", maxExtractors)
wgMssqlExtractors.Wait()
close(chChunks)
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
}()
chRowsTransform := make(chan []UnknownRowValues, QueueSize)
var wgMssqlTransformers sync.WaitGroup
log.Infof("Starting %d MSSQL transformers...", maxExtractors)
transformStartTime := time.Now() transformStartTime := time.Now()
for range maxExtractors { for range maxExtractors {
wgMssqlTransformers.Go(func() { wgTransformers.Go(func() {
transformRowsMssql(sourceColTypes, chChunks, chRowsTransform) transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks)
}) })
} }
go func() {
wgMssqlTransformers.Wait()
close(chRowsTransform)
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
}()
var wgPostgresLoaders sync.WaitGroup
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
loaderStartTime := time.Now() loadStartTime := time.Now()
for range NumLoaders { for range NumLoaders {
wgPostgresLoaders.Go(func() { wgLoaders.Go(func() {
if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chRowsTransform); err != nil { loadRowsPostgres(jobCtx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
log.Error("Unexpected error loading data into postgres: ", err)
}
}) })
} }
wgPostgresLoaders.Wait() go func() {
log.Infof("Loading completed in %v", time.Since(loaderStartTime)) wgActiveBatches.Wait()
close(chBatches)
close(chExtractorErrors)
totalDuration := time.Since(jobStartTime) wgExtractors.Wait()
log.Infof("Migration job completed successfully! Total time: %v", totalDuration) log.Infof("Extraction completed in %v", time.Since(extractStartTime))
close(chChunksRaw)
wgTransformers.Wait()
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
wgActiveChunks.Wait()
close(chChunksTransformed)
close(chLoadersErrors)
wgLoaders.Wait()
log.Infof("Loading completed in %v", time.Since(loadStartTime))
cancel()
}()
<-jobCtx.Done()
log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime))
} }
func logColumnTypes(columnTypes []ColumnType, label string) { func logColumnTypes(columnTypes []ColumnType, label string) {

View File

@@ -1,62 +1,149 @@
package main package main
import ( import (
"context"
"errors"
"sync"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) { type transformerFunc func(any) (any, error)
chunkCount := 0
totalRowsTransformed := 0
for rows := range in { type columnTransformPlan struct {
chunkStartTime := time.Now() index int
log.Debugf("Chunk #%d received, transforming %d rows...", chunkCount+1, len(rows)) fn transformerFunc
}
for _, rowValues := range rows { func transformRowsMssql(
for i, col := range columns { ctx context.Context,
value := rowValues[i] columns []ColumnType,
if col.SystemType() == "uniqueidentifier" { chChunksIn <-chan Chunk,
if b, ok := value.([]byte); ok { chChunksOut chan<- Chunk,
rowValues[i] = mssqlUuidToBigEndian(b) chJobErrorsOut chan<- JobError,
} wgActiveChunks *sync.WaitGroup,
} else if col.SystemType() == "geometry" || col.SystemType() == "geography" { ) {
if b, ok := value.([]byte); ok { transformationPlan := computeTransformationPlan(columns)
rowValues[i] = wkbToEwkbWithSrid(b, 4326)
} for {
} else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" { if ctx.Err() != nil {
if t, ok := value.(time.Time); ok { return
rowValues[i] = ensureUTC(t) }
}
select {
case <-ctx.Done():
return
case chunk, ok := <-chChunksIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chChunksOut <- chunk:
wgActiveChunks.Add(1)
continue
case <-ctx.Done():
return
} }
} }
chunkStartTime := time.Now()
err := processChunk(ctx, &chunk, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
log.Infof("Transformed chunk %s: %d rows in %v", chunk.Id, len(chunk.Data), time.Since(chunkStartTime))
select {
case chChunksOut <- chunk:
case <-ctx.Done():
return
}
wgActiveChunks.Add(1)
}
}
}
func computeTransformationPlan(columns []ColumnType) []columnTransformPlan {
var plan []columnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
const processChunkCtxCheck = 4096
func processChunk(ctx context.Context, chunk *Chunk, transformationPlan []columnTransformPlan) error {
for i, rowValues := range chunk.Data {
if i%processChunkCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
} }
chunkCount++ for _, task := range transformationPlan {
totalRowsTransformed += len(rows) val := rowValues[task.index]
chunkDuration := time.Since(chunkStartTime) if val == nil {
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() continue
log.Infof("Transformed chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows", }
chunkCount, len(rows), chunkDuration, rowsPerSec, totalRowsTransformed)
out <- rows transformed, err := task.fn(val)
if err != nil {
return err
}
rowValues[task.index] = transformed
}
} }
}
func ToInt64(v any) (int64, bool) { return nil
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
}
} }