Files
go-migrate/cmd/go_migrate/process.go

171 lines
4.0 KiB
Go

package main
import (
"context"
"database/sql"
"sync"
"sync/atomic"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func processMigrationJob(
ctx context.Context,
sourceDb *sql.DB,
targetDb *pgxpool.Pool,
job config.Job,
) JobResult {
result := JobResult{
JobName: job.Name,
StartTime: time.Now(),
}
var rowsRead, rowsLoaded, rowsFailed int64
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable)
if err != nil {
result.Error = err
return result
}
logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types")
jobCtx, cancel := context.WithCancel(ctx)
defer cancel()
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerBatch)
if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err)
}
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chBatches := make(chan models.Batch, job.QueueSize)
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
chChunksRaw := make(chan models.Chunk, job.QueueSize)
chChunksTransformed := make(chan models.Chunk, job.QueueSize)
chLoadersErrors := make(chan LoaderError, job.QueueSize)
var wgActiveBatches sync.WaitGroup
var wgActiveChunks sync.WaitGroup
var wgExtractors sync.WaitGroup
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
go func() {
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
cancel()
result.Error = err
}
}()
go custom_errors.ExtractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
maxExtractors := min(job.MaxExtractors, len(batches))
log.Infof("Starting %d extractor(s)...", maxExtractors)
exMssql := extractor.NewMssqlExtractor(sourceDb)
for range maxExtractors {
wgExtractors.Go(func() {
exMssql.Exec(
jobCtx,
job.SourceTable,
sourceColTypes,
job.ChunkSize,
chBatches,
chChunksRaw,
chExtractorErrors,
chJobErrors,
&wgActiveBatches,
&rowsRead,
)
})
}
wgActiveBatches.Add(len(batches))
go func() {
for _, batch := range batches {
chBatches <- batch
}
}()
log.Infof("Starting %d transformer(s)...", maxExtractors)
for range maxExtractors {
wgTransformers.Go(func() {
transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks)
})
}
log.Infof("Starting %d loader(s)...", job.MaxLoaders)
for range job.MaxLoaders {
wgLoaders.Go(func() {
loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks, &rowsLoaded)
})
}
go func() {
wgActiveBatches.Wait()
close(chBatches)
close(chExtractorErrors)
wgExtractors.Wait()
close(chChunksRaw)
wgTransformers.Wait()
wgActiveChunks.Wait()
close(chChunksTransformed)
close(chLoadersErrors)
wgLoaders.Wait()
cancel()
}()
<-jobCtx.Done()
if ctx.Err() != nil {
result.Error = ctx.Err()
}
result.Duration = time.Since(result.StartTime)
result.RowsRead = atomic.LoadInt64(&rowsRead)
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed)
return result
}
func logColumnTypes(columnTypes []models.ColumnType, label string) {
log.Debug(label)
for _, col := range columnTypes {
log.Debugf("%+v", col)
}
}
func logSampleRow(
schema string,
table string,
columns []ColumnType,
rowValues models.UnknownRowValues,
tag string,
) {
log.Infof("[%s.%s] Sample row: (%s)", schema, table, tag)
for i, col := range columns {
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
}
}