171 lines
4.1 KiB
Go
171 lines
4.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
_ "github.com/microsoft/go-mssqldb"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func processMigrationJob(
|
|
ctx context.Context,
|
|
sourceDb *sql.DB,
|
|
targetDb *pgxpool.Pool,
|
|
job config.Job,
|
|
) JobResult {
|
|
result := JobResult{
|
|
JobName: job.Name,
|
|
StartTime: time.Now(),
|
|
}
|
|
|
|
var rowsRead, rowsLoaded, rowsFailed int64
|
|
|
|
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable)
|
|
if err != nil {
|
|
result.Error = err
|
|
return result
|
|
}
|
|
|
|
logColumnTypes(sourceColTypes, "Source col types")
|
|
logColumnTypes(targetColTypes, "Target col types")
|
|
|
|
jobCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerBatch)
|
|
if err != nil {
|
|
log.Error("Unexpected error calculating batch ranges: ", err)
|
|
}
|
|
|
|
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
|
|
chBatches := make(chan models.Batch, job.QueueSize)
|
|
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
|
|
chChunksRaw := make(chan models.Chunk, job.QueueSize)
|
|
chChunksTransformed := make(chan models.Chunk, job.QueueSize)
|
|
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
|
|
|
|
var wgActiveBatches sync.WaitGroup
|
|
var wgActiveChunks sync.WaitGroup
|
|
var wgExtractors sync.WaitGroup
|
|
var wgTransformers sync.WaitGroup
|
|
var wgLoaders sync.WaitGroup
|
|
|
|
go func() {
|
|
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
|
|
cancel()
|
|
result.Error = err
|
|
}
|
|
}()
|
|
|
|
go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
|
|
go custom_errors.LoaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
|
|
|
|
maxExtractors := min(job.MaxExtractors, len(batches))
|
|
log.Infof("Starting %d extractor(s)...", maxExtractors)
|
|
|
|
exMssql := extractor.NewMssqlExtractor(sourceDb)
|
|
|
|
for range maxExtractors {
|
|
wgExtractors.Go(func() {
|
|
exMssql.Exec(
|
|
jobCtx,
|
|
job.SourceTable,
|
|
sourceColTypes,
|
|
job.ChunkSize,
|
|
chBatches,
|
|
chChunksRaw,
|
|
chExtractorErrors,
|
|
chJobErrors,
|
|
&wgActiveBatches,
|
|
&rowsRead,
|
|
)
|
|
})
|
|
}
|
|
|
|
wgActiveBatches.Add(len(batches))
|
|
go func() {
|
|
for _, batch := range batches {
|
|
chBatches <- batch
|
|
}
|
|
}()
|
|
|
|
log.Infof("Starting %d transformer(s)...", maxExtractors)
|
|
|
|
for range maxExtractors {
|
|
wgTransformers.Go(func() {
|
|
transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks)
|
|
})
|
|
}
|
|
|
|
log.Infof("Starting %d loader(s)...", job.MaxLoaders)
|
|
|
|
for range job.MaxLoaders {
|
|
wgLoaders.Go(func() {
|
|
loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks, &rowsLoaded)
|
|
})
|
|
}
|
|
|
|
go func() {
|
|
wgActiveBatches.Wait()
|
|
close(chBatches)
|
|
close(chExtractorErrors)
|
|
|
|
wgExtractors.Wait()
|
|
close(chChunksRaw)
|
|
|
|
wgTransformers.Wait()
|
|
|
|
wgActiveChunks.Wait()
|
|
close(chChunksTransformed)
|
|
close(chLoadersErrors)
|
|
|
|
wgLoaders.Wait()
|
|
|
|
cancel()
|
|
}()
|
|
|
|
<-jobCtx.Done()
|
|
|
|
if ctx.Err() != nil {
|
|
result.Error = ctx.Err()
|
|
}
|
|
|
|
result.Duration = time.Since(result.StartTime)
|
|
result.RowsRead = atomic.LoadInt64(&rowsRead)
|
|
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
|
|
result.RowsFailed = atomic.LoadInt64(&rowsFailed)
|
|
|
|
return result
|
|
}
|
|
|
|
func logColumnTypes(columnTypes []models.ColumnType, label string) {
|
|
log.Debug(label)
|
|
|
|
for _, col := range columnTypes {
|
|
log.Debugf("%+v", col)
|
|
}
|
|
}
|
|
|
|
func logSampleRow(
|
|
schema string,
|
|
table string,
|
|
columns []ColumnType,
|
|
rowValues models.UnknownRowValues,
|
|
tag string,
|
|
) {
|
|
log.Infof("[%s.%s] Sample row: (%s)", schema, table, tag)
|
|
for i, col := range columns {
|
|
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
|
|
}
|
|
}
|