diff --git a/cmd/go_migrate/batch-generator.go b/cmd/go_migrate/batch-generator.go index 19f9614..2732b54 100644 --- a/cmd/go_migrate/batch-generator.go +++ b/cmd/go_migrate/batch-generator.go @@ -91,15 +91,15 @@ ORDER BY batch_id`, return batches, nil } -func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) ([]Batch, error) { +func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]Batch, error) { rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) if err != nil { return nil, err } var batchCount int64 = 1 - if rowsCount > RowsPerBatch { - batchCount = rowsCount / RowsPerBatch + if rowsCount > rowsPerBatch { + batchCount = rowsCount / rowsPerBatch } else { return []Batch{{ Id: uuid.New(), diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index f309603..6024dc0 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -76,7 +76,7 @@ func loadChunkPostgres( select { case chJobErrorsOut <- JobError{ ShouldCancelJob: true, - Msg: fmt.Sprintf("Fatal data integrity error in table %s", identifier.Sanitize()), + Msg: fmt.Sprintf("Fatal error in table %s", identifier.Sanitize()), Prev: err, }: case <-ctx.Done(): diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 1417032..5175513 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -33,17 +33,17 @@ func processMigrationJob( jobCtx, cancel := context.WithCancel(ctx) defer cancel() - batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable) + batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerBatch) if err != nil { log.Error("Unexpected error calculating batch ranges: ", err) } - chJobErrors := make(chan JobError, 50) - chBatches := make(chan Batch, QueueSize) - chExtractorErrors := make(chan ExtractorError, QueueSize) - chChunksRaw := make(chan Chunk, QueueSize) - chChunksTransformed := make(chan Chunk, QueueSize) - chLoadersErrors := make(chan LoaderError, QueueSize) + chJobErrors := make(chan JobError, job.QueueSize) + chBatches := make(chan Batch, job.QueueSize) + chExtractorErrors := make(chan ExtractorError, job.QueueSize) + chChunksRaw := make(chan Chunk, job.QueueSize) + chChunksTransformed := make(chan Chunk, job.QueueSize) + chLoadersErrors := make(chan LoaderError, job.QueueSize) var wgActiveBatches sync.WaitGroup var wgActiveChunks sync.WaitGroup @@ -60,13 +60,13 @@ func processMigrationJob( go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) - maxExtractors := min(NumExtractors, len(batches)) - log.Infof("Starting %d extractors...", maxExtractors) + maxExtractors := min(job.MaxExtractors, len(batches)) + log.Infof("Starting %d extractor(s)...", maxExtractors) extractStartTime := time.Now() for range maxExtractors { wgExtractors.Go(func() { - extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) + extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, job.ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) }) } @@ -77,7 +77,7 @@ func processMigrationJob( } }() - log.Infof("Starting %d transformers...", maxExtractors) + log.Infof("Starting %d transformer(s)...", maxExtractors) transformStartTime := time.Now() for range maxExtractors { @@ -86,10 +86,10 @@ func processMigrationJob( }) } - log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) + log.Infof("Starting %d loader(s)...", job.MaxLoaders) loadStartTime := time.Now() - for range NumLoaders { + for range job.MaxLoaders { wgLoaders.Go(func() { loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks) })