232 lines
5.5 KiB
Go
232 lines
5.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func processMigrationJob(
|
|
ctx context.Context,
|
|
targetDbWrapper dbwrapper.DbWrapper,
|
|
sourceTableAnalyzer etl.TableAnalyzer,
|
|
targetTableAnalyzer etl.TableAnalyzer,
|
|
extractor etl.Extractor,
|
|
transformer etl.Transformer,
|
|
loader etl.Loader,
|
|
job config.Job,
|
|
) JobResult {
|
|
localCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
result := JobResult{
|
|
JobName: job.Name,
|
|
StartTime: time.Now(),
|
|
}
|
|
|
|
var rowsRead, rowsLoaded, rowsFailed int64
|
|
|
|
var wgQueryColumnTypes errgroup.Group
|
|
var sourceColTypes, targetColTypes []models.ColumnType
|
|
|
|
wgQueryColumnTypes.Go(func() error {
|
|
var err error
|
|
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
wgQueryColumnTypes.Go(func() error {
|
|
var err error
|
|
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
err := wgQueryColumnTypes.Wait()
|
|
if err != nil {
|
|
result.Error = err
|
|
return result
|
|
}
|
|
|
|
for _, query := range job.PreSQL {
|
|
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
|
|
result.Error = err
|
|
return result
|
|
}
|
|
}
|
|
|
|
partitions, err := table_analyzers.PartitionRangeGenerator(
|
|
localCtx,
|
|
sourceTableAnalyzer,
|
|
job.SourceTable.TableInfo,
|
|
job.SourceTable.PrimaryKey,
|
|
job.RowsPerPartition,
|
|
)
|
|
if err != nil {
|
|
log.Error("Unexpected error calculating batch ranges: ", err)
|
|
}
|
|
|
|
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
|
|
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
|
|
chPartitions := make(chan models.Partition, job.QueueSize)
|
|
chBatchesRaw := make(chan models.Batch, job.QueueSize)
|
|
chBatchesTransformed := make(chan models.Batch, job.QueueSize)
|
|
|
|
var wgActivePartitions sync.WaitGroup
|
|
var wgActiveBatches sync.WaitGroup
|
|
var wgExtractors sync.WaitGroup
|
|
var wgTransformers sync.WaitGroup
|
|
var wgLoaders sync.WaitGroup
|
|
|
|
go func() {
|
|
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
|
|
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
|
|
cancel()
|
|
result.Error = err
|
|
}
|
|
}()
|
|
|
|
go custom_errors.LoaderErrorHandler(
|
|
localCtx,
|
|
job.Retry,
|
|
job.MaxChunkErrors,
|
|
chLoadersErrors,
|
|
chBatchesTransformed,
|
|
chJobErrors,
|
|
&wgActiveBatches,
|
|
)
|
|
|
|
maxExtractors := min(job.MaxExtractors, len(partitions))
|
|
log.Infof("Starting %d extractor(s)...", maxExtractors)
|
|
|
|
for range maxExtractors {
|
|
wgExtractors.Go(func() {
|
|
extractors.Consume(
|
|
localCtx,
|
|
extractor,
|
|
job.SourceTable,
|
|
sourceColTypes,
|
|
job.BatchSize,
|
|
chPartitions,
|
|
chBatchesRaw,
|
|
chJobErrors,
|
|
&wgActivePartitions,
|
|
&rowsRead,
|
|
)
|
|
})
|
|
}
|
|
|
|
wgActivePartitions.Add(len(partitions))
|
|
go func() {
|
|
for _, batch := range partitions {
|
|
chPartitions <- batch
|
|
}
|
|
}()
|
|
|
|
log.Infof("Starting %d transformer(s)...", maxExtractors)
|
|
|
|
for range maxExtractors {
|
|
wgTransformers.Go(func() {
|
|
transformer.Exec(
|
|
localCtx,
|
|
sourceColTypes,
|
|
chBatchesRaw,
|
|
chBatchesTransformed,
|
|
chJobErrors,
|
|
&wgActiveBatches,
|
|
)
|
|
})
|
|
}
|
|
|
|
log.Infof("Starting %d loader(s)...", job.MaxLoaders)
|
|
|
|
for range job.MaxLoaders {
|
|
wgLoaders.Go(func() {
|
|
loader.Exec(
|
|
localCtx,
|
|
job.TargetTable,
|
|
targetColTypes,
|
|
chBatchesTransformed,
|
|
chLoadersErrors,
|
|
chJobErrors,
|
|
&wgActiveBatches,
|
|
&rowsLoaded,
|
|
)
|
|
})
|
|
}
|
|
|
|
go func() {
|
|
log.Debugf("Waiting for goroutines (%v)", job.Name)
|
|
|
|
wgActivePartitions.Wait()
|
|
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
|
|
close(chPartitions)
|
|
log.Debugf("chPartitions is closed (%v)", job.Name)
|
|
|
|
wgExtractors.Wait()
|
|
log.Debugf("wgExtractors is empty (%v)", job.Name)
|
|
close(chBatchesRaw)
|
|
log.Debugf("chBatchesRaw is closed (%v)", job.Name)
|
|
|
|
wgTransformers.Wait()
|
|
log.Debugf("wgTransformers is empty (%v)", job.Name)
|
|
|
|
wgActiveBatches.Wait()
|
|
log.Debugf("wgActiveBatches is empty (%v)", job.Name)
|
|
close(chBatchesTransformed)
|
|
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
|
|
close(chLoadersErrors)
|
|
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
|
|
|
|
wgLoaders.Wait()
|
|
log.Debugf("wgLoaders is empty (%v)", job.Name)
|
|
|
|
cancel()
|
|
}()
|
|
|
|
for _, query := range job.PostSQL {
|
|
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
|
|
result.Error = err
|
|
return result
|
|
}
|
|
}
|
|
|
|
log.Debugf("waiting for local context to be done (%v)", job.Name)
|
|
<-localCtx.Done()
|
|
log.Debugf("local context done (%v)", job.Name)
|
|
|
|
if ctx.Err() != nil {
|
|
result.Error = ctx.Err()
|
|
}
|
|
|
|
result.Duration = time.Since(result.StartTime)
|
|
result.RowsRead = atomic.LoadInt64(&rowsRead)
|
|
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
|
|
result.RowsFailed = atomic.LoadInt64(&rowsFailed)
|
|
|
|
if result.RowsRead != result.RowsLoaded {
|
|
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
|
|
}
|
|
|
|
return result
|
|
}
|