Files
go-migrate/cmd/go_migrate/process.go

130 lines
3.3 KiB
Go

package main
import (
"context"
"database/sql"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job MigrationJob) {
jobStartTime := time.Now()
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job)
if err != nil {
log.Fatal("Unexpected error: ", err)
}
logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
batches, err := batchGeneratorMssql(ctx, sourceDb, job)
if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err)
}
chJobErrors := make(chan JobError)
defer close(chJobErrors)
go func() {
if err := jobErrorHandler(ctx, chJobErrors); err != nil {
if ctx.Err() == nil {
cancel()
}
}
}()
chBatches := make(chan Batch, len(batches))
chExtractorErrors := make(chan ExtractorError, len(batches))
go func() {
extractorErrorHandler(ctx, chExtractorErrors, chBatches, chJobErrors)
}()
chChunks := make(chan Chunk, QueueSize)
maxExtractors := min(NumExtractors, len(batches))
var wgMssqlExtractors sync.WaitGroup
log.Infof("Starting %d MSSQL extractors...", maxExtractors)
extractStartTime := time.Now()
for range maxExtractors {
wgMssqlExtractors.Go(func() {
extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors, chJobErrors)
})
}
go func() {
for _, br := range batches {
chBatches <- br
}
close(chBatches)
close(chExtractorErrors)
}()
go func() {
wgMssqlExtractors.Wait()
close(chChunks)
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
}()
chChunksTransform := make(chan Chunk, QueueSize)
var wgMssqlTransformers sync.WaitGroup
log.Infof("Starting %d MSSQL transformers...", maxExtractors)
transformStartTime := time.Now()
for range maxExtractors {
wgMssqlTransformers.Go(func() {
transformRowsMssql(ctx, sourceColTypes, chChunks, chChunksTransform, chJobErrors)
})
}
go func() {
wgMssqlTransformers.Wait()
close(chChunksTransform)
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
}()
var wgPostgresLoaders sync.WaitGroup
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
loaderStartTime := time.Now()
for range NumLoaders {
wgPostgresLoaders.Go(func() {
if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chChunksTransform); err != nil {
log.Error("Unexpected error loading data into postgres: ", err)
}
})
}
wgPostgresLoaders.Wait()
log.Infof("Loading completed in %v", time.Since(loaderStartTime))
totalDuration := time.Since(jobStartTime)
log.Infof("Migration job completed successfully! Total time: %v", totalDuration)
}
func logColumnTypes(columnTypes []ColumnType, label string) {
log.Debug(label)
for _, col := range columnTypes {
log.Debugf("%+v", col)
}
}
func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) {
log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag)
for i, col := range columns {
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
}
}