diff --git a/cmd/go_migrate/batch-generator.go b/cmd/go_migrate/batch-generator.go index cbba562..b0488b9 100644 --- a/cmd/go_migrate/batch-generator.go +++ b/cmd/go_migrate/batch-generator.go @@ -33,7 +33,7 @@ GROUP BY t.name` return rowsCount, nil } -func calculateBatchesMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, batchCount int64) ([]models.Batch, error) { +func calculateBatchesMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, batchCount int64) ([]models.Partition, error) { query := fmt.Sprintf(` SELECT MIN([%s]) AS lower_limit, @@ -58,10 +58,10 @@ ORDER BY batch_id`, } defer rows.Close() - batches := make([]models.Batch, 0, batchCount) + batches := make([]models.Partition, 0, batchCount) for rows.Next() { - batch := models.Batch{ + batch := models.Partition{ Id: uuid.New(), ShouldUseRange: true, RetryCounter: 0, @@ -82,7 +82,7 @@ ORDER BY batch_id`, return batches, nil } -func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]models.Batch, error) { +func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]models.Partition, error) { rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) if err != nil { return nil, err @@ -92,7 +92,7 @@ func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.Sourc if rowsCount > rowsPerBatch { batchCount = rowsCount / rowsPerBatch } else { - return []models.Batch{{ + return []models.Partition{{ Id: uuid.New(), ShouldUseRange: false, RetryCounter: 0, diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 1641eeb..21b3fc7 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -50,10 +50,10 @@ func processMigrationJob( } chJobErrors := make(chan custom_errors.JobError, job.QueueSize) - chBatches := make(chan models.Batch, job.QueueSize) + chBatches := make(chan models.Partition, job.QueueSize) chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize) - chChunksRaw := make(chan models.Chunk, job.QueueSize) - chChunksTransformed := make(chan models.Chunk, job.QueueSize) + chChunksRaw := make(chan models.Batch, job.QueueSize) + chChunksTransformed := make(chan models.Batch, job.QueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) var wgActiveBatches sync.WaitGroup diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index 679641e..9bb6f4c 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -23,14 +23,17 @@ type JobConfig struct { Retry RetryConfig `yaml:"retry"` } -type TargetTableInfo struct { +type TableInfo struct { Schema string `yaml:"schema"` Table string `yaml:"table"` } +type TargetTableInfo struct { + TableInfo `yaml:",inline"` +} + type SourceTableInfo struct { - Schema string `yaml:"schema"` - Table string `yaml:"table"` + TableInfo `yaml:",inline"` PrimaryKey string `yaml:"primary_key"` } diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go index 4ff4c4f..ad4ccf0 100644 --- a/internal/app/custom_errors/extractor.error.go +++ b/internal/app/custom_errors/extractor.error.go @@ -10,7 +10,7 @@ import ( ) type ExtractorError struct { - Batch models.Batch + Batch models.Partition LastId int64 HasLastId bool Msg string @@ -24,7 +24,7 @@ func ExtractorErrorHandler( ctx context.Context, maxRetryAttempts int, chErrorsIn <-chan ExtractorError, - chBatchesOut chan<- models.Batch, + chBatchesOut chan<- models.Partition, chJobErrorsOut chan<- JobError, wgActiveBatches *sync.WaitGroup, ) { diff --git a/internal/app/custom_errors/loader.error.go b/internal/app/custom_errors/loader.error.go index 3114b67..4f03e9d 100644 --- a/internal/app/custom_errors/loader.error.go +++ b/internal/app/custom_errors/loader.error.go @@ -9,7 +9,7 @@ import ( ) type LoaderError struct { - models.Chunk + models.Batch Msg string } @@ -21,7 +21,7 @@ func LoaderErrorHandler( ctx context.Context, maxRetryAttempts int, chErrorsIn <-chan LoaderError, - chChunksOut chan<- models.Chunk, + chChunksOut chan<- models.Batch, chJobErrorsOut chan<- JobError, wgActiveChunks *sync.WaitGroup, ) { @@ -59,7 +59,7 @@ func LoaderErrorHandler( err.RetryCounter++ select { - case chChunksOut <- err.Chunk: + case chChunksOut <- err.Batch: case <-ctx.Done(): return } diff --git a/internal/app/etl/extractors/mssql.go b/internal/app/etl/extractors/mssql.go index 1e94a95..629377d 100644 --- a/internal/app/etl/extractors/mssql.go +++ b/internal/app/etl/extractors/mssql.go @@ -73,7 +73,7 @@ func buildExtractQueryMssql( func extractorErrorFromLastRowMssql( lastRow models.UnknownRowValues, indexPrimaryKey int, - batch *models.Batch, + batch *models.Partition, previousError error, ) *custom_errors.ExtractorError { lastIdRawValue := lastRow[indexPrimaryKey] @@ -103,9 +103,9 @@ func (mssqlEx *MssqlExtractor) ProcessBatch( tableInfo config.SourceTableInfo, columns []models.ColumnType, chunkSize int, - batch models.Batch, + batch models.Partition, indexPrimaryKey int, - chChunksOut chan<- models.Chunk, + chChunksOut chan<- models.Batch, rowsRead *int64, ) error { query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) @@ -142,7 +142,7 @@ func (mssqlEx *MssqlExtractor) ProcessBatch( lastRow := rowsChunk[len(rowsChunk)-1] select { - case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case <-ctx.Done(): return nil } @@ -156,7 +156,7 @@ func (mssqlEx *MssqlExtractor) ProcessBatch( if len(rowsChunk) >= chunkSize { select { - case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case <-ctx.Done(): return nil } @@ -181,7 +181,7 @@ func (mssqlEx *MssqlExtractor) ProcessBatch( if len(rowsChunk) > 0 { select { - case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case <-ctx.Done(): return nil } @@ -197,8 +197,8 @@ func (mssqlEx *MssqlExtractor) Exec( tableInfo config.SourceTableInfo, columns []models.ColumnType, chunkSize int, - chBatchesIn <-chan models.Batch, - chChunksOut chan<- models.Chunk, + chBatchesIn <-chan models.Partition, + chChunksOut chan<- models.Batch, chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, diff --git a/internal/app/etl/extractors/postgres.go b/internal/app/etl/extractors/postgres.go index ece399a..374b5da 100644 --- a/internal/app/etl/extractors/postgres.go +++ b/internal/app/etl/extractors/postgres.go @@ -57,9 +57,9 @@ func (postgresEx *PostgresExtractor) ProcessBatch( tableInfo config.SourceTableInfo, columns []models.ColumnType, chunkSize int, - batch models.Batch, + batch models.Partition, indexPrimaryKey int, - chChunksOut chan<- models.Chunk, + chChunksOut chan<- models.Batch, rowsRead *int64, ) error { query := buildExtractQueryPostgres(tableInfo, columns) @@ -86,7 +86,7 @@ func (postgresEx *PostgresExtractor) ProcessBatch( if len(rowsChunk) >= chunkSize { select { - case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case <-ctx.Done(): return nil } @@ -102,7 +102,7 @@ func (postgresEx *PostgresExtractor) ProcessBatch( if len(rowsChunk) > 0 { select { - case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}: + case chChunksOut <- models.Batch{Id: uuid.New(), PartitionId: batch.Id, Data: rowsChunk, RetryCounter: 0}: case <-ctx.Done(): return nil } @@ -118,8 +118,8 @@ func (postgresEx *PostgresExtractor) Exec( tableInfo config.SourceTableInfo, columns []models.ColumnType, chunkSize int, - chBatchesIn <-chan models.Batch, - chChunksOut chan<- models.Chunk, + chBatchesIn <-chan models.Partition, + chChunksOut chan<- models.Batch, chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, diff --git a/internal/app/etl/loaders/postgres.go b/internal/app/etl/loaders/postgres.go index 71c8a47..c9d2276 100644 --- a/internal/app/etl/loaders/postgres.go +++ b/internal/app/etl/loaders/postgres.go @@ -38,7 +38,7 @@ func (postgresLd *PostgresLoader) ProcessChunk( ctx context.Context, tableInfo config.TargetTableInfo, colNames []string, - chunk models.Chunk, + chunk models.Batch, ) (int, error) { tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} _, err := postgresLd.db.CopyFrom( @@ -60,7 +60,7 @@ func (postgresLd *PostgresLoader) ProcessChunk( } } - return 0, &custom_errors.LoaderError{Chunk: chunk, Msg: err.Error()} + return 0, &custom_errors.LoaderError{Batch: chunk, Msg: err.Error()} } return len(chunk.Data), nil @@ -70,7 +70,7 @@ func (postgresLd *PostgresLoader) Exec( ctx context.Context, tableInfo config.TargetTableInfo, columns []models.ColumnType, - chChunksIn <-chan models.Chunk, + chChunksIn <-chan models.Batch, chErrorsOut chan<- custom_errors.LoaderError, chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, diff --git a/internal/app/etl/transformers/mssql.go b/internal/app/etl/transformers/mssql.go index 75ffb4f..23dd68e 100644 --- a/internal/app/etl/transformers/mssql.go +++ b/internal/app/etl/transformers/mssql.go @@ -64,7 +64,7 @@ const processChunkCtxCheck = 4096 func (mssqlTr *MssqlTransformer) ProcessChunk( ctx context.Context, - chunk *models.Chunk, + chunk *models.Batch, transformationPlan []etl.ColumnTransformPlan, ) error { for i, rowValues := range chunk.Data { @@ -94,8 +94,8 @@ func (mssqlTr *MssqlTransformer) ProcessChunk( func (mssqlTr *MssqlTransformer) Exec( ctx context.Context, columns []models.ColumnType, - chChunksIn <-chan models.Chunk, - chChunksOut chan<- models.Chunk, + chChunksIn <-chan models.Batch, + chChunksOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, ) { diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index acfc0cf..05fd79d 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -15,9 +15,9 @@ type Extractor interface { tableInfo config.SourceTableInfo, columns []models.ColumnType, chunkSize int, - batch models.Batch, + batch models.Partition, indexPrimaryKey int, - chChunksOut chan<- models.Chunk, + chChunksOut chan<- models.Batch, rowsRead *int64, ) error @@ -26,8 +26,8 @@ type Extractor interface { tableInfo config.SourceTableInfo, columns []models.ColumnType, chunkSize int, - chBatchesIn <-chan models.Batch, - chChunksOut chan<- models.Chunk, + chBatchesIn <-chan models.Partition, + chChunksOut chan<- models.Batch, chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, @@ -45,15 +45,15 @@ type ColumnTransformPlan struct { type Transformer interface { ProcessChunk( ctx context.Context, - chunk *models.Chunk, + chunk *models.Batch, transformationPlan []ColumnTransformPlan, ) error Exec( ctx context.Context, columns []models.ColumnType, - chChunksIn <-chan models.Chunk, - chChunksOut chan<- models.Chunk, + chChunksIn <-chan models.Batch, + chChunksOut chan<- models.Batch, chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, ) @@ -64,17 +64,35 @@ type Loader interface { ctx context.Context, tableInfo config.TargetTableInfo, colNames []string, - chunk models.Chunk, + chunk models.Batch, ) (int, error) Exec( ctx context.Context, tableInfo config.TargetTableInfo, columns []models.ColumnType, - chChunksIn <-chan models.Chunk, + chChunksIn <-chan models.Batch, chErrorsOut chan<- custom_errors.LoaderError, chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, rowsLoaded *int64, ) } + +type TableAnalizer interface { + QueryColumnTypes( + ctx context.Context, + tableInfo config.TableInfo, + ) ([]models.ColumnType, error) + + EstimateTotalRows( + ctx context.Context, + tableInfo config.TableInfo, + ) (int64, error) + + CalculatePartitionRanges( + ctx context.Context, + tableInfo config.TableInfo, + totalPartitions int, + ) (models.Partition, error) +} diff --git a/internal/app/models/main.go b/internal/app/models/main.go index 81b1ffc..6be796e 100644 --- a/internal/app/models/main.go +++ b/internal/app/models/main.go @@ -4,14 +4,14 @@ import "github.com/google/uuid" type UnknownRowValues = []any -type Chunk struct { +type Batch struct { Id uuid.UUID - BatchId uuid.UUID + PartitionId uuid.UUID Data []UnknownRowValues RetryCounter int } -type Batch struct { +type Partition struct { Id uuid.UUID ParentId uuid.UUID LowerLimit int64