diff --git a/cmd/go_migrate/batch-generator.go b/cmd/go_migrate/batch-generator.go index b0488b9..78bc487 100644 --- a/cmd/go_migrate/batch-generator.go +++ b/cmd/go_migrate/batch-generator.go @@ -82,7 +82,7 @@ ORDER BY batch_id`, return batches, nil } -func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]models.Partition, error) { +func partitionGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]models.Partition, error) { rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) if err != nil { return nil, err diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 21b3fc7..626aad4 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -44,20 +44,20 @@ func processMigrationJob( jobCtx, cancel := context.WithCancel(ctx) defer cancel() - batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerBatch) + partitions, err := partitionGeneratorMssql(jobCtx, sourceDb, job.SourceTable, job.RowsPerBatch) if err != nil { log.Error("Unexpected error calculating batch ranges: ", err) } chJobErrors := make(chan custom_errors.JobError, job.QueueSize) - chBatches := make(chan models.Partition, job.QueueSize) chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize) - chChunksRaw := make(chan models.Batch, job.QueueSize) - chChunksTransformed := make(chan models.Batch, job.QueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) + chPartitions := make(chan models.Partition, job.QueueSize) + chBatchesRaw := make(chan models.Batch, job.QueueSize) + chBatchesTransformed := make(chan models.Batch, job.QueueSize) + var wgActivePartitions sync.WaitGroup var wgActiveBatches sync.WaitGroup - var wgActiveChunks sync.WaitGroup var wgExtractors sync.WaitGroup var wgTransformers sync.WaitGroup var wgLoaders sync.WaitGroup @@ -69,10 +69,10 @@ func processMigrationJob( } }() - go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) - go custom_errors.LoaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) + go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chPartitions, chJobErrors, &wgActivePartitions) + go custom_errors.LoaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chBatchesTransformed, chJobErrors, &wgActiveBatches) - maxExtractors := min(job.MaxExtractors, len(batches)) + maxExtractors := min(job.MaxExtractors, len(partitions)) log.Infof("Starting %d extractor(s)...", maxExtractors) for range maxExtractors { @@ -82,20 +82,20 @@ func processMigrationJob( job.SourceTable, sourceColTypes, job.ChunkSize, - chBatches, - chChunksRaw, + chPartitions, + chBatchesRaw, chExtractorErrors, chJobErrors, - &wgActiveBatches, + &wgActivePartitions, &rowsRead, ) }) } - wgActiveBatches.Add(len(batches)) + wgActivePartitions.Add(len(partitions)) go func() { - for _, batch := range batches { - chBatches <- batch + for _, batch := range partitions { + chPartitions <- batch } }() @@ -106,10 +106,10 @@ func processMigrationJob( transformer.Exec( jobCtx, sourceColTypes, - chChunksRaw, - chChunksTransformed, + chBatchesRaw, + chBatchesTransformed, chJobErrors, - &wgActiveChunks, + &wgActiveBatches, ) }) } @@ -122,27 +122,27 @@ func processMigrationJob( jobCtx, job.TargetTable, targetColTypes, - chChunksTransformed, + chBatchesTransformed, chLoadersErrors, chJobErrors, - &wgActiveChunks, + &wgActiveBatches, &rowsLoaded, ) }) } go func() { - wgActiveBatches.Wait() - close(chBatches) + wgActivePartitions.Wait() + close(chPartitions) close(chExtractorErrors) wgExtractors.Wait() - close(chChunksRaw) + close(chBatchesRaw) wgTransformers.Wait() - wgActiveChunks.Wait() - close(chChunksTransformed) + wgActiveBatches.Wait() + close(chBatchesTransformed) close(chLoadersErrors) wgLoaders.Wait() diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go index ad4ccf0..f1c1bd5 100644 --- a/internal/app/custom_errors/extractor.error.go +++ b/internal/app/custom_errors/extractor.error.go @@ -10,7 +10,7 @@ import ( ) type ExtractorError struct { - Batch models.Partition + Partition models.Partition LastId int64 HasLastId bool Msg string @@ -24,9 +24,9 @@ func ExtractorErrorHandler( ctx context.Context, maxRetryAttempts int, chErrorsIn <-chan ExtractorError, - chBatchesOut chan<- models.Partition, + chPartitionsOut chan<- models.Partition, chJobErrorsOut chan<- JobError, - wgActiveBatches *sync.WaitGroup, + wgActivePartitions *sync.WaitGroup, ) { for { if ctx.Err() != nil { @@ -42,10 +42,10 @@ func ExtractorErrorHandler( return } - if err.Batch.RetryCounter >= maxRetryAttempts { + if err.Partition.RetryCounter >= maxRetryAttempts { jobError := JobError{ ShouldCancelJob: false, - Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Batch.Id, maxRetryAttempts), + Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, maxRetryAttempts), Prev: &err, } @@ -55,22 +55,22 @@ func ExtractorErrorHandler( return } - wgActiveBatches.Done() + wgActivePartitions.Done() continue } - newBatch := err.Batch - newBatch.RetryCounter++ + newPartition := err.Partition + newPartition.RetryCounter++ if err.HasLastId { - newBatch.ParentId = err.Batch.Id - newBatch.Id = uuid.New() - newBatch.LowerLimit = err.LastId - newBatch.IsLowerLimitInclusive = false + newPartition.ParentId = err.Partition.Id + newPartition.Id = uuid.New() + newPartition.LowerLimit = err.LastId + newPartition.IsLowerLimitInclusive = false } select { - case chBatchesOut <- newBatch: + case chPartitionsOut <- newPartition: case <-ctx.Done(): return } diff --git a/internal/app/custom_errors/loader.error.go b/internal/app/custom_errors/loader.error.go index 4f03e9d..a568285 100644 --- a/internal/app/custom_errors/loader.error.go +++ b/internal/app/custom_errors/loader.error.go @@ -21,9 +21,9 @@ func LoaderErrorHandler( ctx context.Context, maxRetryAttempts int, chErrorsIn <-chan LoaderError, - chChunksOut chan<- models.Batch, + chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- JobError, - wgActiveChunks *sync.WaitGroup, + wgActiveBatches *sync.WaitGroup, ) { for { if ctx.Err() != nil { @@ -42,7 +42,7 @@ func LoaderErrorHandler( if err.RetryCounter >= maxRetryAttempts { jobError := JobError{ ShouldCancelJob: false, - Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts), + Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Id, maxRetryAttempts), Prev: &err, } @@ -52,14 +52,14 @@ func LoaderErrorHandler( return } - wgActiveChunks.Done() + wgActiveBatches.Done() continue } err.RetryCounter++ select { - case chChunksOut <- err.Batch: + case chBatchesOut <- err.Batch: case <-ctx.Done(): return } diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index 629377d..3535c85 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -70,20 +70,20 @@ func buildExtractQueryMssql( return sbQuery.String() } -func extractorErrorFromLastRowMssql( +func errorFromLastRow( lastRow models.UnknownRowValues, indexPrimaryKey int, - batch *models.Partition, + partition *models.Partition, previousError error, ) *custom_errors.ExtractorError { lastIdRawValue := lastRow[indexPrimaryKey] lastId, ok := convert.ToInt64(lastIdRawValue) if !ok { - currentBatch := *batch - currentBatch.RetryCounter = 3 + currentPartition := *partition + currentPartition.RetryCounter = 3 return &custom_errors.ExtractorError{ - Batch: currentBatch, + Partition: currentPartition, HasLastId: true, Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), } @@ -91,78 +91,78 @@ func extractorErrorFromLastRowMssql( } return &custom_errors.ExtractorError{ - Batch: *batch, + Partition: *partition, HasLastId: true, LastId: lastId, Msg: previousError.Error(), } } -func (mssqlEx *MssqlExtractor) ProcessBatch( +func (mssqlEx *MssqlExtractor) ProcessPartition( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, - chunkSize int, - batch models.Partition, + batchSize int, + partition models.Partition, indexPrimaryKey int, - chChunksOut chan<- models.Batch, + chBatchesOut chan<- models.Batch, rowsRead *int64, ) error { - query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) + query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive) var queryArgs []any - if batch.ShouldUseRange { + if partition.ShouldUseRange { queryArgs = append(queryArgs, - sql.Named("min", batch.LowerLimit), - sql.Named("max", batch.UpperLimit), + sql.Named("min", partition.LowerLimit), + sql.Named("max", partition.UpperLimit), ) } rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...) if err != nil { - return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} + return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } defer rows.Close() - rowsChunk := make([]models.UnknownRowValues, 0, chunkSize) + batchRows := make([]models.UnknownRowValues, 0, batchSize) for rows.Next() { - values := make([]any, len(columns)) + rowValues := make([]any, len(columns)) scanArgs := make([]any, len(columns)) - for i := range values { - scanArgs[i] = &values[i] + for i := range rowValues { + scanArgs[i] = &rowValues[i] } if err := rows.Scan(scanArgs...); err != nil { - if len(rowsChunk) == 0 { - return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} + if len(batchRows) == 0 { + return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } - lastRow := rowsChunk[len(rowsChunk)-1] + lastRow := batchRows[len(batchRows)-1] select { - case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): return nil } - atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + atomic.AddInt64(rowsRead, int64(len(batchRows))) - return extractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) + return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err) } - rowsChunk = append(rowsChunk, values) + batchRows = append(batchRows, rowValues) - if len(rowsChunk) >= chunkSize { + if len(batchRows) >= batchSize { select { - case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): return nil } - atomic.AddInt64(rowsRead, int64(len(rowsChunk))) - rowsChunk = make([]models.UnknownRowValues, 0, chunkSize) + atomic.AddInt64(rowsRead, int64(len(batchRows))) + batchRows = make([]models.UnknownRowValues, 0, batchSize) } } @@ -171,22 +171,22 @@ func (mssqlEx *MssqlExtractor) ProcessBatch( return ctx.Err() } - if len(rowsChunk) == 0 { - return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} + if len(batchRows) == 0 { + return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } - lastRow := rowsChunk[len(rowsChunk)-1] - return extractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err) + lastRow := batchRows[len(batchRows)-1] + return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err) } - if len(rowsChunk) > 0 { + if len(batchRows) > 0 { select { - case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): return nil } - atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + atomic.AddInt64(rowsRead, int64(len(batchRows))) } return nil @@ -196,12 +196,12 @@ func (mssqlEx *MssqlExtractor) Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, - chunkSize int, - chBatchesIn <-chan models.Partition, - chChunksOut chan<- models.Batch, + batchSize int, + chPartitionsIn <-chan models.Partition, + chBatchesOut chan<- models.Batch, chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, - wgActiveBatches *sync.WaitGroup, + wgActivePartitions *sync.WaitGroup, rowsRead *int64, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { @@ -229,19 +229,19 @@ func (mssqlEx *MssqlExtractor) Exec( select { case <-ctx.Done(): return - case batch, ok := <-chBatchesIn: + case partition, ok := <-chPartitionsIn: if !ok { return } - err := mssqlEx.ProcessBatch( + err := mssqlEx.ProcessPartition( ctx, tableInfo, columns, - chunkSize, - batch, + batchSize, + partition, indexPrimaryKey, - chChunksOut, + chBatchesOut, rowsRead, ) @@ -267,7 +267,7 @@ func (mssqlEx *MssqlExtractor) Exec( return } - wgActiveBatches.Done() + wgActivePartitions.Done() } } } diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index 374b5da..6cd1d3a 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -52,29 +52,29 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) } -func (postgresEx *PostgresExtractor) ProcessBatch( +func (postgresEx *PostgresExtractor) ProcessPartition( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, - chunkSize int, - batch models.Partition, + batchSize int, + partition models.Partition, indexPrimaryKey int, - chChunksOut chan<- models.Batch, + chBatchesOut chan<- models.Batch, rowsRead *int64, ) error { query := buildExtractQueryPostgres(tableInfo, columns) - if batch.ShouldUseRange { + if partition.ShouldUseRange { return errors.New("Batch config not yet supported") } rows, err := postgresEx.db.Query(ctx, query) if err != nil { - return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()} + return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } defer rows.Close() - rowsChunk := make([]models.UnknownRowValues, 0, chunkSize) + batchRows := make([]models.UnknownRowValues, 0, batchSize) for rows.Next() { values, err := rows.Values() @@ -82,17 +82,17 @@ func (postgresEx *PostgresExtractor) ProcessBatch( return errors.New("Unexpected error reading rows from source") } - rowsChunk = append(rowsChunk, values) + batchRows = append(batchRows, values) - if len(rowsChunk) >= chunkSize { + if len(batchRows) >= batchSize { select { - case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): return nil } - atomic.AddInt64(rowsRead, int64(len(rowsChunk))) - rowsChunk = make([]models.UnknownRowValues, 0, chunkSize) + atomic.AddInt64(rowsRead, int64(len(batchRows))) + batchRows = make([]models.UnknownRowValues, 0, batchSize) } } @@ -100,14 +100,14 @@ func (postgresEx *PostgresExtractor) ProcessBatch( return errors.New("Unexpected error reading rows from source") } - if len(rowsChunk) > 0 { + if len(batchRows) > 0 { select { - case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): return nil } - atomic.AddInt64(rowsRead, int64(len(rowsChunk))) + atomic.AddInt64(rowsRead, int64(len(batchRows))) } return nil @@ -117,12 +117,12 @@ func (postgresEx *PostgresExtractor) Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, - chunkSize int, - chBatchesIn <-chan models.Partition, - chChunksOut chan<- models.Batch, + batchSize int, + chPartitionsIn <-chan models.Partition, + chBatchesOut chan<- models.Batch, chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, - wgActiveBatches *sync.WaitGroup, + wgActivePartitions *sync.WaitGroup, rowsRead *int64, ) { } diff --git a/internal/app/etl/loaders/postgres.go b/internal/app/etl/loaders/postgres.go index c9d2276..fc6bc88 100644 --- a/internal/app/etl/loaders/postgres.go +++ b/internal/app/etl/loaders/postgres.go @@ -34,18 +34,18 @@ func mapSlice[T any, V any](input []T, mapper func(T) V) []V { return result } -func (postgresLd *PostgresLoader) ProcessChunk( +func (postgresLd *PostgresLoader) ProcessBatch( ctx context.Context, tableInfo config.TargetTableInfo, colNames []string, - chunk models.Batch, + batch models.Batch, ) (int, error) { tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} _, err := postgresLd.db.CopyFrom( ctx, tableId, colNames, - pgx.CopyFromRows(chunk.Data), + pgx.CopyFromRows(batch.Rows), ) if err != nil { @@ -60,20 +60,20 @@ func (postgresLd *PostgresLoader) ProcessChunk( } } - return 0, &custom_errors.LoaderError{Batch: chunk, Msg: err.Error()} + return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()} } - return len(chunk.Data), nil + return len(batch.Rows), nil } func (postgresLd *PostgresLoader) Exec( ctx context.Context, tableInfo config.TargetTableInfo, columns []models.ColumnType, - chChunksIn <-chan models.Batch, + chBatchesIn <-chan models.Batch, chErrorsOut chan<- custom_errors.LoaderError, chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, + wgActiveBatches *sync.WaitGroup, rowsLoaded *int64, ) { colNames := mapSlice(columns, func(col models.ColumnType) string { @@ -88,12 +88,12 @@ func (postgresLd *PostgresLoader) Exec( select { case <-ctx.Done(): return - case chunk, ok := <-chChunksIn: + case batch, ok := <-chBatchesIn: if !ok { return } - processedRows, err := postgresLd.ProcessChunk(ctx, tableInfo, colNames, chunk) + processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch) if err != nil { var ldError *custom_errors.LoaderError @@ -117,7 +117,7 @@ func (postgresLd *PostgresLoader) Exec( return } - wgActiveChunks.Done() + wgActiveBatches.Done() atomic.AddInt64(rowsLoaded, int64(processedRows)) } } diff --git a/internal/app/etl/transformers/mssql.go b/internal/app/etl/transformers/mssql.go index 23dd68e..7270ebb 100644 --- a/internal/app/etl/transformers/mssql.go +++ b/internal/app/etl/transformers/mssql.go @@ -60,15 +60,15 @@ func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransfor return plan } -const processChunkCtxCheck = 4096 +const processBatchCtxCheck = 4096 -func (mssqlTr *MssqlTransformer) ProcessChunk( +func (mssqlTr *MssqlTransformer) ProcessBatch( ctx context.Context, - chunk *models.Batch, + batch *models.Batch, transformationPlan []etl.ColumnTransformPlan, ) error { - for i, rowValues := range chunk.Data { - if i%processChunkCtxCheck == 0 { + for i, rowValues := range batch.Rows { + if i%processBatchCtxCheck == 0 { if err := ctx.Err(); err != nil { return err } @@ -94,10 +94,10 @@ func (mssqlTr *MssqlTransformer) ProcessChunk( func (mssqlTr *MssqlTransformer) Exec( ctx context.Context, columns []models.ColumnType, - chChunksIn <-chan models.Batch, - chChunksOut chan<- models.Batch, + chBatchesIn <-chan models.Batch, + chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, + wgActiveBatches *sync.WaitGroup, ) { transformationPlan := computeTransformationPlan(columns) @@ -110,22 +110,22 @@ func (mssqlTr *MssqlTransformer) Exec( case <-ctx.Done(): return - case chunk, ok := <-chChunksIn: + case batch, ok := <-chBatchesIn: if !ok { return } if len(transformationPlan) == 0 { select { - case chChunksOut <- chunk: - wgActiveChunks.Add(1) + case chBatchesOut <- batch: + wgActiveBatches.Add(1) continue case <-ctx.Done(): return } } - err := mssqlTr.ProcessChunk(ctx, &chunk, transformationPlan) + err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan) if err != nil { if errors.Is(err, ctx.Err()) { return @@ -139,12 +139,12 @@ func (mssqlTr *MssqlTransformer) Exec( } select { - case chChunksOut <- chunk: + case chBatchesOut <- batch: case <-ctx.Done(): return } - wgActiveChunks.Add(1) + wgActiveBatches.Add(1) } } } diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index 05fd79d..c3e2f60 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -10,14 +10,14 @@ import ( ) type Extractor interface { - ProcessBatch( + ProcessPartition( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, - chunkSize int, - batch models.Partition, + batchSize int, + partition models.Partition, indexPrimaryKey int, - chChunksOut chan<- models.Batch, + chBatchesOut chan<- models.Batch, rowsRead *int64, ) error @@ -25,12 +25,12 @@ type Extractor interface { ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, - chunkSize int, - chBatchesIn <-chan models.Partition, - chChunksOut chan<- models.Batch, + batchSize int, + chPartitionsIn <-chan models.Partition, + chBatchesOut chan<- models.Batch, chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, - wgActiveBatches *sync.WaitGroup, + wgActivePartitions *sync.WaitGroup, rowsRead *int64, ) } @@ -43,38 +43,38 @@ type ColumnTransformPlan struct { } type Transformer interface { - ProcessChunk( + ProcessBatch( ctx context.Context, - chunk *models.Batch, + batch *models.Batch, transformationPlan []ColumnTransformPlan, ) error Exec( ctx context.Context, columns []models.ColumnType, - chChunksIn <-chan models.Batch, - chChunksOut chan<- models.Batch, + chBatchesIn <-chan models.Batch, + chBactchesOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, + wgActiveBatches *sync.WaitGroup, ) } type Loader interface { - ProcessChunk( + ProcessBatch( ctx context.Context, tableInfo config.TargetTableInfo, colNames []string, - chunk models.Batch, + Batch models.Batch, ) (int, error) Exec( ctx context.Context, tableInfo config.TargetTableInfo, columns []models.ColumnType, - chChunksIn <-chan models.Batch, + chBatchesIn <-chan models.Batch, chErrorsOut chan<- custom_errors.LoaderError, chJobErrorsOut chan<- custom_errors.JobError, - wgActiveChunks *sync.WaitGroup, + wgActiveBatches *sync.WaitGroup, rowsLoaded *int64, ) } diff --git a/internal/app/models/main.go b/internal/app/models/main.go index 6be796e..6eafb8d 100644 --- a/internal/app/models/main.go +++ b/internal/app/models/main.go @@ -7,7 +7,7 @@ type UnknownRowValues = []any type Batch struct { Id uuid.UUID PartitionId uuid.UUID - Data []UnknownRowValues + Rows []UnknownRowValues RetryCounter int }