refactor: rename Batch to Partition in error handling and processing functions for consistency

This commit is contained in:
2026-04-11 00:30:54 -05:00
parent 9eb9821daf
commit 955bc65ce9
10 changed files with 151 additions and 151 deletions

View File

@@ -82,7 +82,7 @@ ORDER BY batch_id`,
return batches, nil return batches, nil
} }
func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]models.Partition, error) { func partitionGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]models.Partition, error) {
rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -44,20 +44,20 @@ func processMigrationJob(
jobCtx, cancel := context.WithCancel(ctx) jobCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerBatch) partitions, err := partitionGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerBatch)
if err != nil { if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
chJobErrors := make(chan custom_errors.JobError, job.QueueSize) chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chBatches := make(chan models.Partition, job.QueueSize)
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize) chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
chChunksRaw := make(chan models.Batch, job.QueueSize)
chChunksTransformed := make(chan models.Batch, job.QueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
chPartitions := make(chan models.Partition, job.QueueSize)
chBatchesRaw := make(chan models.Batch, job.QueueSize)
chBatchesTransformed := make(chan models.Batch, job.QueueSize)
var wgActivePartitions sync.WaitGroup
var wgActiveBatches sync.WaitGroup var wgActiveBatches sync.WaitGroup
var wgActiveChunks sync.WaitGroup
var wgExtractors sync.WaitGroup var wgExtractors sync.WaitGroup
var wgTransformers sync.WaitGroup var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup var wgLoaders sync.WaitGroup
@@ -69,10 +69,10 @@ func processMigrationJob(
} }
}() }()
go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chPartitions, chJobErrors, &wgActivePartitions)
go custom_errors.LoaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) go custom_errors.LoaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chBatchesTransformed, chJobErrors, &wgActiveBatches)
maxExtractors := min(job.MaxExtractors, len(batches)) maxExtractors := min(job.MaxExtractors, len(partitions))
log.Infof("Starting %d extractor(s)...", maxExtractors) log.Infof("Starting %d extractor(s)...", maxExtractors)
for range maxExtractors { for range maxExtractors {
@@ -82,20 +82,20 @@ func processMigrationJob(
job.SourceTable, job.SourceTable,
sourceColTypes, sourceColTypes,
job.ChunkSize, job.ChunkSize,
chBatches, chPartitions,
chChunksRaw, chBatchesRaw,
chExtractorErrors, chExtractorErrors,
chJobErrors, chJobErrors,
&wgActiveBatches, &wgActivePartitions,
&rowsRead, &rowsRead,
) )
}) })
} }
wgActiveBatches.Add(len(batches)) wgActivePartitions.Add(len(partitions))
go func() { go func() {
for _, batch := range batches { for _, batch := range partitions {
chBatches <- batch chPartitions <- batch
} }
}() }()
@@ -106,10 +106,10 @@ func processMigrationJob(
transformer.Exec( transformer.Exec(
jobCtx, jobCtx,
sourceColTypes, sourceColTypes,
chChunksRaw, chBatchesRaw,
chChunksTransformed, chBatchesTransformed,
chJobErrors, chJobErrors,
&wgActiveChunks, &wgActiveBatches,
) )
}) })
} }
@@ -122,27 +122,27 @@ func processMigrationJob(
jobCtx, jobCtx,
job.TargetTable, job.TargetTable,
targetColTypes, targetColTypes,
chChunksTransformed, chBatchesTransformed,
chLoadersErrors, chLoadersErrors,
chJobErrors, chJobErrors,
&wgActiveChunks, &wgActiveBatches,
&rowsLoaded, &rowsLoaded,
) )
}) })
} }
go func() { go func() {
wgActiveBatches.Wait() wgActivePartitions.Wait()
close(chBatches) close(chPartitions)
close(chExtractorErrors) close(chExtractorErrors)
wgExtractors.Wait() wgExtractors.Wait()
close(chChunksRaw) close(chBatchesRaw)
wgTransformers.Wait() wgTransformers.Wait()
wgActiveChunks.Wait() wgActiveBatches.Wait()
close(chChunksTransformed) close(chBatchesTransformed)
close(chLoadersErrors) close(chLoadersErrors)
wgLoaders.Wait() wgLoaders.Wait()

View File

@@ -10,7 +10,7 @@ import (
) )
type ExtractorError struct { type ExtractorError struct {
Batch models.Partition Partition models.Partition
LastId int64 LastId int64
HasLastId bool HasLastId bool
Msg string Msg string
@@ -24,9 +24,9 @@ func ExtractorErrorHandler(
ctx context.Context, ctx context.Context,
maxRetryAttempts int, maxRetryAttempts int,
chErrorsIn <-chan ExtractorError, chErrorsIn <-chan ExtractorError,
chBatchesOut chan<- models.Partition, chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
) { ) {
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
@@ -42,10 +42,10 @@ func ExtractorErrorHandler(
return return
} }
if err.Batch.RetryCounter >= maxRetryAttempts { if err.Partition.RetryCounter >= maxRetryAttempts {
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Batch.Id, maxRetryAttempts), Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, maxRetryAttempts),
Prev: &err, Prev: &err,
} }
@@ -55,22 +55,22 @@ func ExtractorErrorHandler(
return return
} }
wgActiveBatches.Done() wgActivePartitions.Done()
continue continue
} }
newBatch := err.Batch newPartition := err.Partition
newBatch.RetryCounter++ newPartition.RetryCounter++
if err.HasLastId { if err.HasLastId {
newBatch.ParentId = err.Batch.Id newPartition.ParentId = err.Partition.Id
newBatch.Id = uuid.New() newPartition.Id = uuid.New()
newBatch.LowerLimit = err.LastId newPartition.LowerLimit = err.LastId
newBatch.IsLowerLimitInclusive = false newPartition.IsLowerLimitInclusive = false
} }
select { select {
case chBatchesOut <- newBatch: case chPartitionsOut <- newPartition:
case <-ctx.Done(): case <-ctx.Done():
return return
} }

View File

@@ -21,9 +21,9 @@ func LoaderErrorHandler(
ctx context.Context, ctx context.Context,
maxRetryAttempts int, maxRetryAttempts int,
chErrorsIn <-chan LoaderError, chErrorsIn <-chan LoaderError,
chChunksOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
) { ) {
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
@@ -42,7 +42,7 @@ func LoaderErrorHandler(
if err.RetryCounter >= maxRetryAttempts { if err.RetryCounter >= maxRetryAttempts {
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts), Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err, Prev: &err,
} }
@@ -52,14 +52,14 @@ func LoaderErrorHandler(
return return
} }
wgActiveChunks.Done() wgActiveBatches.Done()
continue continue
} }
err.RetryCounter++ err.RetryCounter++
select { select {
case chChunksOut <- err.Batch: case chBatchesOut <- err.Batch:
case <-ctx.Done(): case <-ctx.Done():
return return
} }

View File

@@ -70,20 +70,20 @@ func buildExtractQueryMssql(
return sbQuery.String() return sbQuery.String()
} }
func extractorErrorFromLastRowMssql( func errorFromLastRow(
lastRow models.UnknownRowValues, lastRow models.UnknownRowValues,
indexPrimaryKey int, indexPrimaryKey int,
batch *models.Partition, partition *models.Partition,
previousError error, previousError error,
) *custom_errors.ExtractorError { ) *custom_errors.ExtractorError {
lastIdRawValue := lastRow[indexPrimaryKey] lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue) lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok { if !ok {
currentBatch := *batch currentPartition := *partition
currentBatch.RetryCounter = 3 currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{ return &custom_errors.ExtractorError{
Batch: currentBatch, Partition: currentPartition,
HasLastId: true, HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
} }
@@ -91,78 +91,78 @@ func extractorErrorFromLastRowMssql(
} }
return &custom_errors.ExtractorError{ return &custom_errors.ExtractorError{
Batch: *batch, Partition: *partition,
HasLastId: true, HasLastId: true,
LastId: lastId, LastId: lastId,
Msg: previousError.Error(), Msg: previousError.Error(),
} }
} }
func (mssqlEx *MssqlExtractor) ProcessBatch( func (mssqlEx *MssqlExtractor) ProcessPartition(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
chunkSize int, batchSize int,
batch models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chChunksOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, rowsRead *int64,
) error { ) error {
query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive)
var queryArgs []any var queryArgs []any
if batch.ShouldUseRange { if partition.ShouldUseRange {
queryArgs = append(queryArgs, queryArgs = append(queryArgs,
sql.Named("min", batch.LowerLimit), sql.Named("min", partition.LowerLimit),
sql.Named("max", batch.UpperLimit), sql.Named("max", partition.UpperLimit),
) )
} }
rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...) rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...)
if err != nil { if err != nil {
return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
defer rows.Close() defer rows.Close()
rowsChunk := make([]models.UnknownRowValues, 0, chunkSize) batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() { for rows.Next() {
values := make([]any, len(columns)) rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns)) scanArgs := make([]any, len(columns))
for i := range values { for i := range rowValues {
scanArgs[i] = &values[i] scanArgs[i] = &rowValues[i]
} }
if err := rows.Scan(scanArgs...); err != nil { if err := rows.Scan(scanArgs...); err != nil {
if len(rowsChunk) == 0 { if len(batchRows) == 0 {
return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
lastRow := rowsChunk[len(rowsChunk)-1] lastRow := batchRows[len(batchRows)-1]
select { select {
case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
} }
atomic.AddInt64(rowsRead, int64(len(rowsChunk))) atomic.AddInt64(rowsRead, int64(len(batchRows)))
return extractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err)
} }
rowsChunk = append(rowsChunk, values) batchRows = append(batchRows, rowValues)
if len(rowsChunk) >= chunkSize { if len(batchRows) >= batchSize {
select { select {
case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
} }
atomic.AddInt64(rowsRead, int64(len(rowsChunk))) atomic.AddInt64(rowsRead, int64(len(batchRows)))
rowsChunk = make([]models.UnknownRowValues, 0, chunkSize) batchRows = make([]models.UnknownRowValues, 0, batchSize)
} }
} }
@@ -171,22 +171,22 @@ func (mssqlEx *MssqlExtractor) ProcessBatch(
return ctx.Err() return ctx.Err()
} }
if len(rowsChunk) == 0 { if len(batchRows) == 0 {
return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
lastRow := rowsChunk[len(rowsChunk)-1] lastRow := batchRows[len(batchRows)-1]
return extractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err)
} }
if len(rowsChunk) > 0 { if len(batchRows) > 0 {
select { select {
case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
} }
atomic.AddInt64(rowsRead, int64(len(rowsChunk))) atomic.AddInt64(rowsRead, int64(len(batchRows)))
} }
return nil return nil
@@ -196,12 +196,12 @@ func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
chunkSize int, batchSize int,
chBatchesIn <-chan models.Partition, chPartitionsIn <-chan models.Partition,
chChunksOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError, chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
rowsRead *int64, rowsRead *int64,
) { ) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
@@ -229,19 +229,19 @@ func (mssqlEx *MssqlExtractor) Exec(
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case batch, ok := <-chBatchesIn: case partition, ok := <-chPartitionsIn:
if !ok { if !ok {
return return
} }
err := mssqlEx.ProcessBatch( err := mssqlEx.ProcessPartition(
ctx, ctx,
tableInfo, tableInfo,
columns, columns,
chunkSize, batchSize,
batch, partition,
indexPrimaryKey, indexPrimaryKey,
chChunksOut, chBatchesOut,
rowsRead, rowsRead,
) )
@@ -267,7 +267,7 @@ func (mssqlEx *MssqlExtractor) Exec(
return return
} }
wgActiveBatches.Done() wgActivePartitions.Done()
} }
} }
} }

View File

@@ -52,29 +52,29 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
} }
func (postgresEx *PostgresExtractor) ProcessBatch( func (postgresEx *PostgresExtractor) ProcessPartition(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
chunkSize int, batchSize int,
batch models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chChunksOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, rowsRead *int64,
) error { ) error {
query := buildExtractQueryPostgres(tableInfo, columns) query := buildExtractQueryPostgres(tableInfo, columns)
if batch.ShouldUseRange { if partition.ShouldUseRange {
return errors.New("Batch config not yet supported") return errors.New("Batch config not yet supported")
} }
rows, err := postgresEx.db.Query(ctx, query) rows, err := postgresEx.db.Query(ctx, query)
if err != nil { if err != nil {
return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
defer rows.Close() defer rows.Close()
rowsChunk := make([]models.UnknownRowValues, 0, chunkSize) batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() { for rows.Next() {
values, err := rows.Values() values, err := rows.Values()
@@ -82,17 +82,17 @@ func (postgresEx *PostgresExtractor) ProcessBatch(
return errors.New("Unexpected error reading rows from source") return errors.New("Unexpected error reading rows from source")
} }
rowsChunk = append(rowsChunk, values) batchRows = append(batchRows, values)
if len(rowsChunk) >= chunkSize { if len(batchRows) >= batchSize {
select { select {
case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
} }
atomic.AddInt64(rowsRead, int64(len(rowsChunk))) atomic.AddInt64(rowsRead, int64(len(batchRows)))
rowsChunk = make([]models.UnknownRowValues, 0, chunkSize) batchRows = make([]models.UnknownRowValues, 0, batchSize)
} }
} }
@@ -100,14 +100,14 @@ func (postgresEx *PostgresExtractor) ProcessBatch(
return errors.New("Unexpected error reading rows from source") return errors.New("Unexpected error reading rows from source")
} }
if len(rowsChunk) > 0 { if len(batchRows) > 0 {
select { select {
case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
} }
atomic.AddInt64(rowsRead, int64(len(rowsChunk))) atomic.AddInt64(rowsRead, int64(len(batchRows)))
} }
return nil return nil
@@ -117,12 +117,12 @@ func (postgresEx *PostgresExtractor) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
chunkSize int, batchSize int,
chBatchesIn <-chan models.Partition, chPartitionsIn <-chan models.Partition,
chChunksOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError, chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
rowsRead *int64, rowsRead *int64,
) { ) {
} }

View File

@@ -34,18 +34,18 @@ func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
return result return result
} }
func (postgresLd *PostgresLoader) ProcessChunk( func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
colNames []string, colNames []string,
chunk models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table}
_, err := postgresLd.db.CopyFrom( _, err := postgresLd.db.CopyFrom(
ctx, ctx,
tableId, tableId,
colNames, colNames,
pgx.CopyFromRows(chunk.Data), pgx.CopyFromRows(batch.Rows),
) )
if err != nil { if err != nil {
@@ -60,20 +60,20 @@ func (postgresLd *PostgresLoader) ProcessChunk(
} }
} }
return 0, &custom_errors.LoaderError{Batch: chunk, Msg: err.Error()} return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
} }
return len(chunk.Data), nil return len(batch.Rows), nil
} }
func (postgresLd *PostgresLoader) Exec( func (postgresLd *PostgresLoader) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
chChunksIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError, chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64, rowsLoaded *int64,
) { ) {
colNames := mapSlice(columns, func(col models.ColumnType) string { colNames := mapSlice(columns, func(col models.ColumnType) string {
@@ -88,12 +88,12 @@ func (postgresLd *PostgresLoader) Exec(
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case chunk, ok := <-chChunksIn: case batch, ok := <-chBatchesIn:
if !ok { if !ok {
return return
} }
processedRows, err := postgresLd.ProcessChunk(ctx, tableInfo, colNames, chunk) processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil { if err != nil {
var ldError *custom_errors.LoaderError var ldError *custom_errors.LoaderError
@@ -117,7 +117,7 @@ func (postgresLd *PostgresLoader) Exec(
return return
} }
wgActiveChunks.Done() wgActiveBatches.Done()
atomic.AddInt64(rowsLoaded, int64(processedRows)) atomic.AddInt64(rowsLoaded, int64(processedRows))
} }
} }

View File

@@ -60,15 +60,15 @@ func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransfor
return plan return plan
} }
const processChunkCtxCheck = 4096 const processBatchCtxCheck = 4096
func (mssqlTr *MssqlTransformer) ProcessChunk( func (mssqlTr *MssqlTransformer) ProcessBatch(
ctx context.Context, ctx context.Context,
chunk *models.Batch, batch *models.Batch,
transformationPlan []etl.ColumnTransformPlan, transformationPlan []etl.ColumnTransformPlan,
) error { ) error {
for i, rowValues := range chunk.Data { for i, rowValues := range batch.Rows {
if i%processChunkCtxCheck == 0 { if i%processBatchCtxCheck == 0 {
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return err return err
} }
@@ -94,10 +94,10 @@ func (mssqlTr *MssqlTransformer) ProcessChunk(
func (mssqlTr *MssqlTransformer) Exec( func (mssqlTr *MssqlTransformer) Exec(
ctx context.Context, ctx context.Context,
columns []models.ColumnType, columns []models.ColumnType,
chChunksIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chChunksOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
) { ) {
transformationPlan := computeTransformationPlan(columns) transformationPlan := computeTransformationPlan(columns)
@@ -110,22 +110,22 @@ func (mssqlTr *MssqlTransformer) Exec(
case <-ctx.Done(): case <-ctx.Done():
return return
case chunk, ok := <-chChunksIn: case batch, ok := <-chBatchesIn:
if !ok { if !ok {
return return
} }
if len(transformationPlan) == 0 { if len(transformationPlan) == 0 {
select { select {
case chChunksOut <- chunk: case chBatchesOut <- batch:
wgActiveChunks.Add(1) wgActiveBatches.Add(1)
continue continue
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
err := mssqlTr.ProcessChunk(ctx, &chunk, transformationPlan) err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
if err != nil { if err != nil {
if errors.Is(err, ctx.Err()) { if errors.Is(err, ctx.Err()) {
return return
@@ -139,12 +139,12 @@ func (mssqlTr *MssqlTransformer) Exec(
} }
select { select {
case chChunksOut <- chunk: case chBatchesOut <- batch:
case <-ctx.Done(): case <-ctx.Done():
return return
} }
wgActiveChunks.Add(1) wgActiveBatches.Add(1)
} }
} }
} }

View File

@@ -10,14 +10,14 @@ import (
) )
type Extractor interface { type Extractor interface {
ProcessBatch( ProcessPartition(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
chunkSize int, batchSize int,
batch models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chChunksOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, rowsRead *int64,
) error ) error
@@ -25,12 +25,12 @@ type Extractor interface {
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
chunkSize int, batchSize int,
chBatchesIn <-chan models.Partition, chPartitionsIn <-chan models.Partition,
chChunksOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError, chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
rowsRead *int64, rowsRead *int64,
) )
} }
@@ -43,38 +43,38 @@ type ColumnTransformPlan struct {
} }
type Transformer interface { type Transformer interface {
ProcessChunk( ProcessBatch(
ctx context.Context, ctx context.Context,
chunk *models.Batch, batch *models.Batch,
transformationPlan []ColumnTransformPlan, transformationPlan []ColumnTransformPlan,
) error ) error
Exec( Exec(
ctx context.Context, ctx context.Context,
columns []models.ColumnType, columns []models.ColumnType,
chChunksIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chChunksOut chan<- models.Batch, chBactchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
) )
} }
type Loader interface { type Loader interface {
ProcessChunk( ProcessBatch(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
colNames []string, colNames []string,
chunk models.Batch, Batch models.Batch,
) (int, error) ) (int, error)
Exec( Exec(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
chChunksIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError, chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64, rowsLoaded *int64,
) )
} }

View File

@@ -7,7 +7,7 @@ type UnknownRowValues = []any
type Batch struct { type Batch struct {
Id uuid.UUID Id uuid.UUID
PartitionId uuid.UUID PartitionId uuid.UUID
Data []UnknownRowValues Rows []UnknownRowValues
RetryCounter int RetryCounter int
} }