Files
go-migrate/cmd/go_migrate/process.go

120 lines
3.2 KiB
Go

package main
import (
"context"
"database/sql"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job MigrationJob) {
jobStartTime := time.Now()
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job)
if err != nil {
log.Fatal("Unexpected error: ", err)
}
logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types")
mssqlCtx := context.Background()
batches, err := batchGeneratorMssql(mssqlCtx, sourceDb, job)
if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err)
}
chJobErrors := make(chan error)
defer close(chJobErrors)
chBatches := make(chan Batch, len(batches))
chChunks := make(chan []UnknownRowValues, QueueSize)
chExtractorErrors := make(chan ExtractorError, len(batches))
maxExtractors := min(NumExtractors, len(batches))
var wgMssqlExtractors sync.WaitGroup
log.Infof("Starting %d MSSQL extractors...", maxExtractors)
extractStartTime := time.Now()
for range maxExtractors {
wgMssqlExtractors.Go(func() {
extractFromMssql(mssqlCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors)
})
}
go func() {
for _, br := range batches {
chBatches <- br
}
close(chBatches)
close(chExtractorErrors)
}()
go func() {
extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors)
}()
go func() {
wgMssqlExtractors.Wait()
close(chChunks)
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
}()
chRowsTransform := make(chan []UnknownRowValues, QueueSize)
var wgMssqlTransformers sync.WaitGroup
log.Infof("Starting %d MSSQL transformers...", maxExtractors)
transformStartTime := time.Now()
for range maxExtractors {
wgMssqlTransformers.Go(func() {
transformRowsMssql(sourceColTypes, chChunks, chRowsTransform)
})
}
go func() {
wgMssqlTransformers.Wait()
close(chRowsTransform)
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
}()
var wgPostgresLoaders sync.WaitGroup
postgresLoaderCtx := context.Background()
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
loaderStartTime := time.Now()
for range NumLoaders {
wgPostgresLoaders.Go(func() {
if err := loadRowsPostgres(postgresLoaderCtx, job, targetColTypes, targetDb, chRowsTransform); err != nil {
log.Error("Unexpected error loading data into postgres: ", err)
}
})
}
wgPostgresLoaders.Wait()
log.Infof("Loading completed in %v", time.Since(loaderStartTime))
totalDuration := time.Since(jobStartTime)
log.Infof("Migration job completed successfully! Total time: %v", totalDuration)
}
func logColumnTypes(columnTypes []ColumnType, label string) {
log.Debug(label)
for _, col := range columnTypes {
log.Debugf("%+v", col)
}
}
func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) {
log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag)
for i, col := range columns {
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
}
}