package main import ( "context" "fmt" "sync" "sync/atomic" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" ) func processMigrationJob( ctx context.Context, targetDbWrapper dbwrapper.DbWrapper, sourceTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer, extractor etl.Extractor, transformer etl.Transformer, loader etl.Loader, job config.Job, ) JobResult { localCtx, cancel := context.WithCancel(ctx) defer cancel() result := JobResult{ JobName: job.Name, StartTime: time.Now(), } var rowsRead, rowsLoaded, rowsFailed int64 var wgQueryColumnTypes errgroup.Group var sourceColTypes, targetColTypes []models.ColumnType wgQueryColumnTypes.Go(func() error { var err error sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo) if err != nil { return err } return nil }) wgQueryColumnTypes.Go(func() error { var err error targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo) if err != nil { return err } return nil }) err := wgQueryColumnTypes.Wait() if err != nil { result.Error = err return result } for _, query := range job.PreSQL { if _, err := targetDbWrapper.Exec(localCtx, query); err != nil { result.Error = err return result } } partitions, err := table_analyzers.PartitionRangeGenerator( localCtx, sourceTableAnalyzer, job.SourceTable.TableInfo, job.SourceTable.PrimaryKey, job.RowsPerPartition, ) if err != nil { log.Error("Unexpected error calculating batch ranges: ", err) } chJobErrors := make(chan custom_errors.JobError, job.QueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) chPartitions := make(chan models.Partition, job.QueueSize) chBatchesRaw := make(chan models.Batch, job.QueueSize) chBatchesTransformed := make(chan models.Batch, job.QueueSize) var wgActivePartitions sync.WaitGroup var wgActiveBatches sync.WaitGroup var wgExtractors sync.WaitGroup var wgTransformers sync.WaitGroup var wgLoaders sync.WaitGroup go func() { if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil { log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err) cancel() result.Error = err } }() go custom_errors.LoaderErrorHandler( localCtx, job.Retry, job.MaxChunkErrors, chLoadersErrors, chBatchesTransformed, chJobErrors, &wgActiveBatches, ) maxExtractors := min(job.MaxExtractors, len(partitions)) log.Infof("Starting %d extractor(s)...", maxExtractors) for range maxExtractors { wgExtractors.Go(func() { extractor.Consume( localCtx, job.SourceTable, sourceColTypes, job.BatchSize, chPartitions, chBatchesRaw, chJobErrors, &wgActivePartitions, &rowsRead, ) }) } wgActivePartitions.Add(len(partitions)) go func() { for _, batch := range partitions { chPartitions <- batch } }() log.Infof("Starting %d transformer(s)...", maxExtractors) for range maxExtractors { wgTransformers.Go(func() { transformer.Exec( localCtx, sourceColTypes, chBatchesRaw, chBatchesTransformed, chJobErrors, &wgActiveBatches, ) }) } log.Infof("Starting %d loader(s)...", job.MaxLoaders) for range job.MaxLoaders { wgLoaders.Go(func() { loader.Exec( localCtx, job.TargetTable, targetColTypes, chBatchesTransformed, chLoadersErrors, chJobErrors, &wgActiveBatches, &rowsLoaded, ) }) } go func() { log.Debugf("Waiting for goroutines (%v)", job.Name) wgActivePartitions.Wait() log.Debugf("wgActivePartitions is empty (%v)", job.Name) close(chPartitions) log.Debugf("chPartitions is closed (%v)", job.Name) wgExtractors.Wait() log.Debugf("wgExtractors is empty (%v)", job.Name) close(chBatchesRaw) log.Debugf("chBatchesRaw is closed (%v)", job.Name) wgTransformers.Wait() log.Debugf("wgTransformers is empty (%v)", job.Name) wgActiveBatches.Wait() log.Debugf("wgActiveBatches is empty (%v)", job.Name) close(chBatchesTransformed) log.Debugf("chBatchesTransformed is empty (%v)", job.Name) close(chLoadersErrors) log.Debugf("chLoadersErrors is empty (%v)", job.Name) wgLoaders.Wait() log.Debugf("wgLoaders is empty (%v)", job.Name) cancel() }() for _, query := range job.PostSQL { if _, err := targetDbWrapper.Exec(localCtx, query); err != nil { result.Error = err return result } } log.Debugf("waiting for local context to be done (%v)", job.Name) <-localCtx.Done() log.Debugf("local context done (%v)", job.Name) if ctx.Err() != nil { result.Error = ctx.Err() } result.Duration = time.Since(result.StartTime) result.RowsRead = atomic.LoadInt64(&rowsRead) result.RowsLoaded = atomic.LoadInt64(&rowsLoaded) result.RowsFailed = atomic.LoadInt64(&rowsFailed) if result.RowsRead != result.RowsLoaded { result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed) } return result }