diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go index 26f97e2..250e5d0 100644 --- a/cmd/go_migrate/extractor.go +++ b/cmd/go_migrate/extractor.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" @@ -14,6 +15,13 @@ import ( type UnknownRowValues = []any +type Chunk struct { + Id uuid.UUID + BatchId uuid.UUID + Data []UnknownRowValues + RetryCounter int +} + func extractFromMssql( ctx context.Context, db *sql.DB, @@ -21,7 +29,7 @@ func extractFromMssql( columns []ColumnType, chunkSize int, chBatchesIn <-chan Batch, - chChunksOut chan<- []UnknownRowValues, + chChunksOut chan<- Chunk, chErrorsOut chan<- ExtractorError, chJobErrorsOut chan<- JobError, ) { @@ -34,7 +42,11 @@ func extractFromMssql( ShouldCancelJob: true, Msg: "Primary key not found in provided columns", } - chJobErrorsOut <- jobError + select { + case chJobErrorsOut <- jobError: + case <-ctx.Done(): + return + } return } @@ -89,8 +101,13 @@ func extractFromMssql( } lastRow := rowsChunk[len(rowsChunk)-1] - chChunksOut <- rowsChunk chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) + chChunksOut <- Chunk{ + Id: uuid.New(), + BatchId: batch.Id, + Data: rowsChunk, + RetryCounter: 0, + } return } @@ -102,7 +119,12 @@ func extractFromMssql( rowsPerSec := float64(chunkSize) / chunkDuration.Seconds() log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted) - chChunksOut <- rowsChunk + chChunksOut <- Chunk{ + Id: uuid.New(), + BatchId: batch.Id, + Data: rowsChunk, + RetryCounter: 0, + } rowsChunk = make([]UnknownRowValues, 0, chunkSize) chunkStartTime = time.Now() } @@ -113,7 +135,12 @@ func extractFromMssql( rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds() log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted) - chChunksOut <- rowsChunk + chChunksOut <- Chunk{ + Id: uuid.New(), + BatchId: batch.Id, + Data: rowsChunk, + RetryCounter: 0, + } } if err := rows.Err(); err != nil { diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 4589dac..5ecc68d 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -48,7 +48,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors) }() - chChunks := make(chan []UnknownRowValues, QueueSize) + chChunks := make(chan Chunk, QueueSize) maxExtractors := min(NumExtractors, len(batches)) var wgMssqlExtractors sync.WaitGroup diff --git a/cmd/go_migrate/transformer.go b/cmd/go_migrate/transformer.go index 21fa033..5fa850a 100644 --- a/cmd/go_migrate/transformer.go +++ b/cmd/go_migrate/transformer.go @@ -8,18 +8,18 @@ import ( func transformRowsMssql( columns []ColumnType, - chChunksIn <-chan []UnknownRowValues, + chChunksIn <-chan Chunk, chChunksOut chan<- []UnknownRowValues, chJobErrorsOut chan<- JobError, ) { chunkCount := 0 totalRowsTransformed := 0 - for rows := range chChunksIn { + for chunk := range chChunksIn { chunkStartTime := time.Now() - log.Debugf("Chunk received, transforming %d rows...", len(rows)) + log.Debugf("Chunk received, transforming %d rows...", len(chunk.Data)) - for _, rowValues := range rows { + for _, rowValues := range chunk.Data { for i, col := range columns { value := rowValues[i] @@ -61,12 +61,12 @@ func transformRowsMssql( } chunkCount++ - totalRowsTransformed += len(rows) + totalRowsTransformed += len(chunk.Data) chunkDuration := time.Since(chunkStartTime) - rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() + rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds() log.Infof("Transformed chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", - len(rows), chunkDuration, rowsPerSec, totalRowsTransformed) + len(chunk.Data), chunkDuration, rowsPerSec, totalRowsTransformed) - chChunksOut <- rows + chChunksOut <- chunk.Data } }