Compare commits
11 Commits
e158986947
...
a0b51f40c1
| Author | SHA1 | Date | |
|---|---|---|---|
|
a0b51f40c1
|
|||
|
b64a76ca45
|
|||
|
51480015ba
|
|||
|
dc632361e5
|
|||
|
0ee5d9032c
|
|||
|
d3a3b26bb3
|
|||
|
554618daad
|
|||
|
7924dd3af7
|
|||
|
f6dfcd390f
|
|||
|
853be4a5a6
|
|||
|
eeef3bc813
|
@@ -1,7 +1,9 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
@@ -19,15 +21,41 @@ func (e *ExtractorError) Error() string {
|
||||
|
||||
const maxRetryAttempts = 3
|
||||
|
||||
func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chJobErrorsOut chan<- JobError) {
|
||||
for err := range chErrorsIn {
|
||||
func extractorErrorHandler(
|
||||
ctx context.Context,
|
||||
chErrorsIn <-chan ExtractorError,
|
||||
chBatchesOut chan<- Batch,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
) {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case err, ok := <-chErrorsIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if err.RetryCounter >= maxRetryAttempts {
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
|
||||
Prev: &err,
|
||||
}
|
||||
chJobErrorsOut <- jobError
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- jobError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
wgActiveBatches.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -41,7 +69,12 @@ func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<-
|
||||
newBatch.IsLowerLimitInclusive = false
|
||||
}
|
||||
|
||||
chBatchesOut <- newBatch
|
||||
select {
|
||||
case chBatchesOut <- newBatch:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,20 +85,18 @@ func ExtractorErrorFromLastRowMssql(lastRow UnknownRowValues, indexPrimaryKey in
|
||||
if !ok {
|
||||
currentBatch := *batch
|
||||
currentBatch.RetryCounter = maxRetryAttempts
|
||||
exError := ExtractorError{
|
||||
return ExtractorError{
|
||||
Batch: currentBatch,
|
||||
HasLastId: true,
|
||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
||||
}
|
||||
return exError
|
||||
|
||||
}
|
||||
|
||||
exError := ExtractorError{
|
||||
return ExtractorError{
|
||||
Batch: *batch,
|
||||
HasLastId: true,
|
||||
LastId: lastId,
|
||||
Msg: previousError.Error(),
|
||||
}
|
||||
return exError
|
||||
}
|
||||
|
||||
@@ -3,10 +3,13 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
_ "github.com/microsoft/go-mssqldb"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -14,6 +17,13 @@ import (
|
||||
|
||||
type UnknownRowValues = []any
|
||||
|
||||
type Chunk struct {
|
||||
Id uuid.UUID
|
||||
BatchId uuid.UUID
|
||||
Data []UnknownRowValues
|
||||
RetryCounter int
|
||||
}
|
||||
|
||||
func extractFromMssql(
|
||||
ctx context.Context,
|
||||
db *sql.DB,
|
||||
@@ -21,25 +31,62 @@ func extractFromMssql(
|
||||
columns []ColumnType,
|
||||
chunkSize int,
|
||||
chBatchesIn <-chan Batch,
|
||||
chChunksOut chan<- []UnknownRowValues,
|
||||
chChunksOut chan<- Chunk,
|
||||
chErrorsOut chan<- ExtractorError,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
) {
|
||||
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
|
||||
return strings.EqualFold(col.name, job.PrimaryKey)
|
||||
})
|
||||
|
||||
if indexPrimaryKey == -1 {
|
||||
exError := JobError{
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: "Primary key not found in provided columns",
|
||||
}
|
||||
chJobErrorsOut <- exError
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- jobError:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for batch := range chBatchesIn {
|
||||
func() {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processBatch(
|
||||
ctx context.Context,
|
||||
db *sql.DB,
|
||||
job MigrationJob,
|
||||
columns []ColumnType,
|
||||
chunkSize int,
|
||||
batch Batch,
|
||||
indexPrimaryKey int,
|
||||
chChunksOut chan<- Chunk,
|
||||
chErrorsOut chan<- ExtractorError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
) (abort bool) {
|
||||
query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive)
|
||||
log.Debug("Query used to extract data from mssql: ", query)
|
||||
|
||||
@@ -54,13 +101,12 @@ func extractFromMssql(
|
||||
queryStartTime := time.Now()
|
||||
rows, err := db.QueryContext(ctx, query, queryArgs...)
|
||||
if err != nil {
|
||||
exError := ExtractorError{
|
||||
Batch: batch,
|
||||
HasLastId: false,
|
||||
Msg: err.Error(),
|
||||
select {
|
||||
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
chErrorsOut <- exError
|
||||
return
|
||||
return false
|
||||
}
|
||||
defer rows.Close()
|
||||
log.Debugf("Query executed in %v", time.Since(queryStartTime))
|
||||
@@ -79,19 +125,28 @@ func extractFromMssql(
|
||||
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
if len(rowsChunk) == 0 {
|
||||
exError := ExtractorError{
|
||||
Batch: batch,
|
||||
HasLastId: false,
|
||||
Msg: err.Error(),
|
||||
select {
|
||||
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
chErrorsOut <- exError
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
lastRow := rowsChunk[len(rowsChunk)-1]
|
||||
chChunksOut <- rowsChunk
|
||||
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
|
||||
return
|
||||
select {
|
||||
case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err):
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
|
||||
select {
|
||||
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
rowsChunk = append(rowsChunk, values)
|
||||
@@ -100,39 +155,55 @@ func extractFromMssql(
|
||||
if len(rowsChunk) >= chunkSize {
|
||||
chunkDuration := time.Since(chunkStartTime)
|
||||
rowsPerSec := float64(chunkSize) / chunkDuration.Seconds()
|
||||
log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
|
||||
len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
|
||||
chChunksOut <- rowsChunk
|
||||
log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
|
||||
|
||||
select {
|
||||
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
|
||||
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
|
||||
chunkStartTime = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
if len(rowsChunk) > 0 {
|
||||
chunkDuration := time.Since(chunkStartTime)
|
||||
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
|
||||
log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows",
|
||||
len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
|
||||
chChunksOut <- rowsChunk
|
||||
if err := rows.Err(); err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return true
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
if len(rowsChunk) == 0 {
|
||||
exError := ExtractorError{
|
||||
Batch: batch,
|
||||
HasLastId: false,
|
||||
Msg: err.Error(),
|
||||
select {
|
||||
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
chErrorsOut <- exError
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
lastRow := rowsChunk[len(rowsChunk)-1]
|
||||
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
|
||||
return
|
||||
select {
|
||||
case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err):
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
}()
|
||||
return false
|
||||
}
|
||||
|
||||
if len(rowsChunk) > 0 {
|
||||
chunkDuration := time.Since(chunkStartTime)
|
||||
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
|
||||
log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
|
||||
select {
|
||||
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
wgActiveBatches.Done()
|
||||
return false
|
||||
}
|
||||
|
||||
func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -20,14 +21,27 @@ func (e *JobError) Error() string {
|
||||
return e.Msg
|
||||
}
|
||||
|
||||
func jobErrorHandler(chErrorsIn <-chan JobError) error {
|
||||
for err := range chErrorsIn {
|
||||
func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
|
||||
case err, ok := <-chErrorsIn:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err.ShouldCancelJob {
|
||||
log.Error(err.Msg, " - ", err.Prev)
|
||||
return &err
|
||||
}
|
||||
|
||||
log.Error(err)
|
||||
log.Error(err.Msg, " - ", err.Prev)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
65
cmd/go_migrate/loader-error-handler.go
Normal file
65
cmd/go_migrate/loader-error-handler.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type LoaderError struct {
|
||||
Chunk
|
||||
Msg string
|
||||
}
|
||||
|
||||
func (e *LoaderError) Error() string {
|
||||
return e.Msg
|
||||
}
|
||||
|
||||
func loaderErrorHandler(
|
||||
ctx context.Context,
|
||||
chErrorsIn <-chan LoaderError,
|
||||
chChunksOut chan<- Chunk,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
wgActiveChunks *sync.WaitGroup,
|
||||
) {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case err, ok := <-chErrorsIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if err.RetryCounter >= maxRetryAttempts {
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts),
|
||||
Prev: &err,
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- jobError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
wgActiveChunks.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
err.RetryCounter++
|
||||
|
||||
select {
|
||||
case chChunksOut <- err.Chunk:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,61 +3,103 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
mssql "github.com/microsoft/go-mssqldb"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
|
||||
func loadRowsPostgres(
|
||||
ctx context.Context,
|
||||
db *pgxpool.Pool,
|
||||
job MigrationJob,
|
||||
columns []ColumnType,
|
||||
chChunksIn <-chan Chunk,
|
||||
chErrorsOut chan<- LoaderError,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
wgActiveChunks *sync.WaitGroup,
|
||||
) {
|
||||
tableId := pgx.Identifier{job.Schema, job.Table}
|
||||
colNames := Map(columns, func(col ColumnType) string {
|
||||
return col.name
|
||||
})
|
||||
|
||||
for rows := range in {
|
||||
log.Debugf("Chunk received, loading data into...")
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i, rowValues := range rows {
|
||||
if i%100 == 0 {
|
||||
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chunk, ok := <-chChunksIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks); abort {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error {
|
||||
chunkCount := 0
|
||||
totalRowsLoaded := 0
|
||||
|
||||
for rows := range in {
|
||||
func loadChunkPostgres(
|
||||
ctx context.Context,
|
||||
db *pgxpool.Pool,
|
||||
identifier pgx.Identifier,
|
||||
colNames []string,
|
||||
chunk Chunk,
|
||||
chErrorsOut chan<- LoaderError,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
wgActiveChunks *sync.WaitGroup,
|
||||
) (abort bool) {
|
||||
chunkStartTime := time.Now()
|
||||
identifier := pgx.Identifier{job.Schema, job.Table}
|
||||
colNames := Map(columns, func(col ColumnType) string {
|
||||
return col.name
|
||||
})
|
||||
|
||||
copyStartTime := time.Now()
|
||||
_, err := db.CopyFrom(
|
||||
ctx,
|
||||
identifier,
|
||||
colNames,
|
||||
pgx.CopyFromRows(rows),
|
||||
pgx.CopyFromRows(chunk.Data),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
var pgErr *pgconn.PgError
|
||||
if errors.As(err, &pgErr) {
|
||||
if pgErr.Code == "23505" {
|
||||
select {
|
||||
case chJobErrorsOut <- JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: fmt.Sprintf("Fatal data integrity error in table %s", identifier.Sanitize()),
|
||||
Prev: err,
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
wgActiveChunks.Done()
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case chErrorsOut <- LoaderError{Chunk: chunk, Msg: err.Error()}:
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
chunkCount++
|
||||
totalRowsLoaded += len(rows)
|
||||
copyDuration := time.Since(copyStartTime)
|
||||
chunkDuration := time.Since(chunkStartTime)
|
||||
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
|
||||
rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds()
|
||||
|
||||
log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
|
||||
}
|
||||
log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec)
|
||||
|
||||
return nil
|
||||
wgActiveChunks.Done()
|
||||
return false
|
||||
}
|
||||
|
||||
func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error {
|
||||
@@ -134,3 +176,16 @@ func Map[T any, V any](input []T, mapper func(T) V) []V {
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
|
||||
|
||||
for rows := range in {
|
||||
log.Debugf("Chunk received, loading data into...")
|
||||
|
||||
for i, rowValues := range rows {
|
||||
if i%100 == 0 {
|
||||
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,5 +13,5 @@ func configureLog() {
|
||||
DisableSorting: false,
|
||||
PadLevelText: true,
|
||||
})
|
||||
log.SetLevel(log.DebugLevel)
|
||||
log.SetLevel(log.InfoLevel)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -37,6 +38,10 @@ const (
|
||||
func main() {
|
||||
configureLog()
|
||||
startTime := time.Now()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
log.Info("=== Starting migration ===")
|
||||
log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize)
|
||||
|
||||
@@ -50,7 +55,7 @@ func main() {
|
||||
|
||||
for _, job := range migrationJobs {
|
||||
log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table)
|
||||
processMigrationJob(sourceDb, targetDb, job)
|
||||
processMigrationJob(ctx, sourceDb, targetDb, job)
|
||||
}
|
||||
|
||||
totalDuration := time.Since(startTime)
|
||||
|
||||
@@ -2,27 +2,29 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
func mssqlUuidToBigEndian(mssqlUuid []byte) []byte {
|
||||
func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
|
||||
if len(mssqlUuid) != 16 {
|
||||
return mssqlUuid
|
||||
return nil, errors.New("Invalid uuid")
|
||||
}
|
||||
|
||||
pgUuid := make([]byte, 16)
|
||||
pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0]
|
||||
pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4]
|
||||
pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6]
|
||||
copy(pgUuid[8:], mssqlUuid[8:])
|
||||
|
||||
return pgUuid
|
||||
return pgUuid, nil
|
||||
}
|
||||
|
||||
const sridFlag = 0x20000000
|
||||
|
||||
func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
|
||||
func wkbToEwkbWithSrid(geometry []byte, srid int) ([]byte, error) {
|
||||
if len(geometry) < 5 {
|
||||
return geometry
|
||||
return nil, errors.New("Invalid wkb")
|
||||
}
|
||||
|
||||
var byteOrder binary.ByteOrder
|
||||
@@ -34,7 +36,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
|
||||
|
||||
wkbType := byteOrder.Uint32(geometry[1:5])
|
||||
if wkbType&sridFlag != 0 {
|
||||
return geometry
|
||||
return geometry, nil
|
||||
}
|
||||
|
||||
ewkbType := wkbType | sridFlag
|
||||
@@ -49,7 +51,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
|
||||
|
||||
copy(result[9:], geometry[5:])
|
||||
|
||||
return result
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func ensureUTC(t time.Time) time.Time {
|
||||
@@ -59,3 +61,20 @@ func ensureUTC(t time.Time) time.Time {
|
||||
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
|
||||
}
|
||||
|
||||
func ToInt64(v any) (int64, bool) {
|
||||
switch t := v.(type) {
|
||||
case int:
|
||||
return int64(t), true
|
||||
case int8:
|
||||
return int64(t), true
|
||||
case int16:
|
||||
return int64(t), true
|
||||
case int32:
|
||||
return int64(t), true
|
||||
case int64:
|
||||
return int64(t), true
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,12 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job MigrationJob) {
|
||||
func processMigrationJob(
|
||||
ctx context.Context,
|
||||
sourceDb *sql.DB,
|
||||
targetDb *pgxpool.Pool,
|
||||
job MigrationJob,
|
||||
) {
|
||||
jobStartTime := time.Now()
|
||||
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
|
||||
|
||||
@@ -24,91 +29,95 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
|
||||
logColumnTypes(sourceColTypes, "Source col types")
|
||||
logColumnTypes(targetColTypes, "Target col types")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
jobCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
batches, err := batchGeneratorMssql(ctx, sourceDb, job)
|
||||
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job)
|
||||
if err != nil {
|
||||
log.Error("Unexpected error calculating batch ranges: ", err)
|
||||
}
|
||||
|
||||
chJobErrors := make(chan JobError)
|
||||
defer close(chJobErrors)
|
||||
chJobErrors := make(chan JobError, 50)
|
||||
chBatches := make(chan Batch, QueueSize)
|
||||
chExtractorErrors := make(chan ExtractorError, QueueSize)
|
||||
chChunksRaw := make(chan Chunk, QueueSize)
|
||||
chChunksTransformed := make(chan Chunk, QueueSize)
|
||||
chLoadersErrors := make(chan LoaderError, QueueSize)
|
||||
|
||||
var wgActiveBatches sync.WaitGroup
|
||||
var wgActiveChunks sync.WaitGroup
|
||||
var wgExtractors sync.WaitGroup
|
||||
var wgTransformers sync.WaitGroup
|
||||
var wgLoaders sync.WaitGroup
|
||||
|
||||
go func() {
|
||||
if err := jobErrorHandler(chJobErrors); err != nil {
|
||||
if err := jobErrorHandler(jobCtx, chJobErrors); err != nil {
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
chBatches := make(chan Batch, len(batches))
|
||||
chExtractorErrors := make(chan ExtractorError, len(batches))
|
||||
go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
|
||||
go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
|
||||
|
||||
go func() {
|
||||
extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors)
|
||||
}()
|
||||
|
||||
chChunks := make(chan []UnknownRowValues, QueueSize)
|
||||
maxExtractors := min(NumExtractors, len(batches))
|
||||
var wgMssqlExtractors sync.WaitGroup
|
||||
|
||||
log.Infof("Starting %d MSSQL extractors...", maxExtractors)
|
||||
log.Infof("Starting %d extractors...", maxExtractors)
|
||||
extractStartTime := time.Now()
|
||||
|
||||
for range maxExtractors {
|
||||
wgMssqlExtractors.Go(func() {
|
||||
extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors, chJobErrors)
|
||||
wgExtractors.Go(func() {
|
||||
extractFromMssql(jobCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches)
|
||||
})
|
||||
}
|
||||
|
||||
wgActiveBatches.Add(len(batches))
|
||||
go func() {
|
||||
for _, br := range batches {
|
||||
chBatches <- br
|
||||
for _, batch := range batches {
|
||||
chBatches <- batch
|
||||
}
|
||||
close(chBatches)
|
||||
close(chExtractorErrors)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
wgMssqlExtractors.Wait()
|
||||
close(chChunks)
|
||||
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
|
||||
}()
|
||||
|
||||
chRowsTransform := make(chan []UnknownRowValues, QueueSize)
|
||||
var wgMssqlTransformers sync.WaitGroup
|
||||
|
||||
log.Infof("Starting %d MSSQL transformers...", maxExtractors)
|
||||
log.Infof("Starting %d transformers...", maxExtractors)
|
||||
transformStartTime := time.Now()
|
||||
|
||||
for range maxExtractors {
|
||||
wgMssqlTransformers.Go(func() {
|
||||
transformRowsMssql(sourceColTypes, chChunks, chRowsTransform)
|
||||
wgTransformers.Go(func() {
|
||||
transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks)
|
||||
})
|
||||
}
|
||||
|
||||
go func() {
|
||||
wgMssqlTransformers.Wait()
|
||||
close(chRowsTransform)
|
||||
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
|
||||
}()
|
||||
|
||||
var wgPostgresLoaders sync.WaitGroup
|
||||
|
||||
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
|
||||
loaderStartTime := time.Now()
|
||||
loadStartTime := time.Now()
|
||||
|
||||
for range NumLoaders {
|
||||
wgPostgresLoaders.Go(func() {
|
||||
if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chRowsTransform); err != nil {
|
||||
log.Error("Unexpected error loading data into postgres: ", err)
|
||||
}
|
||||
wgLoaders.Go(func() {
|
||||
loadRowsPostgres(jobCtx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
|
||||
})
|
||||
}
|
||||
|
||||
wgPostgresLoaders.Wait()
|
||||
log.Infof("Loading completed in %v", time.Since(loaderStartTime))
|
||||
go func() {
|
||||
wgActiveBatches.Wait()
|
||||
close(chBatches)
|
||||
close(chExtractorErrors)
|
||||
|
||||
totalDuration := time.Since(jobStartTime)
|
||||
log.Infof("Migration job completed successfully! Total time: %v", totalDuration)
|
||||
wgExtractors.Wait()
|
||||
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
|
||||
close(chChunksRaw)
|
||||
|
||||
wgTransformers.Wait()
|
||||
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
|
||||
|
||||
wgActiveChunks.Wait()
|
||||
close(chChunksTransformed)
|
||||
close(chLoadersErrors)
|
||||
|
||||
wgLoaders.Wait()
|
||||
log.Infof("Loading completed in %v", time.Since(loadStartTime))
|
||||
|
||||
cancel()
|
||||
}()
|
||||
|
||||
<-jobCtx.Done()
|
||||
log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime))
|
||||
}
|
||||
|
||||
func logColumnTypes(columnTypes []ColumnType, label string) {
|
||||
|
||||
@@ -1,62 +1,149 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) {
|
||||
chunkCount := 0
|
||||
totalRowsTransformed := 0
|
||||
type transformerFunc func(any) (any, error)
|
||||
|
||||
type columnTransformPlan struct {
|
||||
index int
|
||||
fn transformerFunc
|
||||
}
|
||||
|
||||
func transformRowsMssql(
|
||||
ctx context.Context,
|
||||
columns []ColumnType,
|
||||
chChunksIn <-chan Chunk,
|
||||
chChunksOut chan<- Chunk,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
wgActiveChunks *sync.WaitGroup,
|
||||
) {
|
||||
transformationPlan := computeTransformationPlan(columns)
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case chunk, ok := <-chChunksIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if len(transformationPlan) == 0 {
|
||||
select {
|
||||
case chChunksOut <- chunk:
|
||||
wgActiveChunks.Add(1)
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for rows := range in {
|
||||
chunkStartTime := time.Now()
|
||||
log.Debugf("Chunk #%d received, transforming %d rows...", chunkCount+1, len(rows))
|
||||
|
||||
for _, rowValues := range rows {
|
||||
err := processChunk(ctx, &chunk, transformationPlan)
|
||||
if err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Transformed chunk %s: %d rows in %v", chunk.Id, len(chunk.Data), time.Since(chunkStartTime))
|
||||
|
||||
select {
|
||||
case chChunksOut <- chunk:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
wgActiveChunks.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func computeTransformationPlan(columns []ColumnType) []columnTransformPlan {
|
||||
var plan []columnTransformPlan
|
||||
|
||||
for i, col := range columns {
|
||||
value := rowValues[i]
|
||||
if col.SystemType() == "uniqueidentifier" {
|
||||
if b, ok := value.([]byte); ok {
|
||||
rowValues[i] = mssqlUuidToBigEndian(b)
|
||||
switch col.SystemType() {
|
||||
case "uniqueidentifier":
|
||||
plan = append(plan, columnTransformPlan{
|
||||
index: i,
|
||||
fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return mssqlUuidToBigEndian(b)
|
||||
}
|
||||
} else if col.SystemType() == "geometry" || col.SystemType() == "geography" {
|
||||
if b, ok := value.([]byte); ok {
|
||||
rowValues[i] = wkbToEwkbWithSrid(b, 4326)
|
||||
}
|
||||
} else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" {
|
||||
if t, ok := value.(time.Time); ok {
|
||||
rowValues[i] = ensureUTC(t)
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "geometry", "geography":
|
||||
plan = append(plan, columnTransformPlan{
|
||||
index: i,
|
||||
fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return wkbToEwkbWithSrid(b, 4326)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "datetime", "datetime2":
|
||||
plan = append(plan, columnTransformPlan{
|
||||
index: i,
|
||||
fn: func(v any) (any, error) {
|
||||
if t, ok := v.(time.Time); ok {
|
||||
return ensureUTC(t), nil
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
chunkCount++
|
||||
totalRowsTransformed += len(rows)
|
||||
chunkDuration := time.Since(chunkStartTime)
|
||||
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
|
||||
log.Infof("Transformed chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows",
|
||||
chunkCount, len(rows), chunkDuration, rowsPerSec, totalRowsTransformed)
|
||||
|
||||
out <- rows
|
||||
}
|
||||
return plan
|
||||
}
|
||||
|
||||
func ToInt64(v any) (int64, bool) {
|
||||
switch t := v.(type) {
|
||||
case int:
|
||||
return int64(t), true
|
||||
case int8:
|
||||
return int64(t), true
|
||||
case int16:
|
||||
return int64(t), true
|
||||
case int32:
|
||||
return int64(t), true
|
||||
case int64:
|
||||
return int64(t), true
|
||||
default:
|
||||
return 0, false
|
||||
const processChunkCtxCheck = 4096
|
||||
|
||||
func processChunk(ctx context.Context, chunk *Chunk, transformationPlan []columnTransformPlan) error {
|
||||
for i, rowValues := range chunk.Data {
|
||||
if i%processChunkCtxCheck == 0 {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, task := range transformationPlan {
|
||||
val := rowValues[task.index]
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
transformed, err := task.fn(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rowValues[task.index] = transformed
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user