diff --git a/cmd/go_migrate/batch-generator.go b/cmd/go_migrate/batch-generator.go index 2732b54..90791e9 100644 --- a/cmd/go_migrate/batch-generator.go +++ b/cmd/go_migrate/batch-generator.go @@ -7,6 +7,7 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" ) @@ -42,7 +43,7 @@ GROUP BY t.name` return rowsCount, nil } -func calculateBatchesMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, batchCount int64) ([]Batch, error) { +func calculateBatchesMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, batchCount int64) ([]models.Batch, error) { query := fmt.Sprintf(` SELECT MIN([%s]) AS lower_limit, @@ -67,10 +68,10 @@ ORDER BY batch_id`, } defer rows.Close() - batches := make([]Batch, 0, batchCount) + batches := make([]models.Batch, 0, batchCount) for rows.Next() { - batch := Batch{ + batch := models.Batch{ Id: uuid.New(), ShouldUseRange: true, RetryCounter: 0, @@ -91,7 +92,7 @@ ORDER BY batch_id`, return batches, nil } -func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]Batch, error) { +func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]models.Batch, error) { rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) if err != nil { return nil, err @@ -101,7 +102,7 @@ func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.Sourc if rowsCount > rowsPerBatch { batchCount = rowsCount / rowsPerBatch } else { - return []Batch{{ + return []models.Batch{{ Id: uuid.New(), ShouldUseRange: false, RetryCounter: 0, diff --git a/cmd/go_migrate/inspect-columns.go b/cmd/go_migrate/inspect-columns.go index e6936c4..6da878c 100644 --- a/cmd/go_migrate/inspect-columns.go +++ b/cmd/go_migrate/inspect-columns.go @@ -10,6 +10,7 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" @@ -67,7 +68,7 @@ func GetUnifiedType(systemType string) string { return strings.ToUpper(systemType) } -func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, scale *int64) ColumnType { +func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, scale *int64) models.ColumnType { stringTypes := map[string]bool{ "varchar": true, "char": true, "character": true, "text": true, "character varying": true, } @@ -109,10 +110,23 @@ func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, sc column.unifiedType = GetUnifiedType(column.systemType) - return column + colType := models.NewColumnType( + column.name, + column.hasMaxLength, + column.hasPrecisionScale, + column.userType, + column.systemType, + column.unifiedType, + column.nullable, + column.maxLength, + column.precision, + column.scale, + ) + + return colType } -func GetColumnTypesPostgres(db *pgxpool.Pool, tableInfo config.TargetTableInfo) ([]ColumnType, error) { +func GetColumnTypesPostgres(db *pgxpool.Pool, tableInfo config.TargetTableInfo) ([]models.ColumnType, error) { query := ` SELECT c.column_name AS name, @@ -136,7 +150,7 @@ ORDER BY c.ordinal_position; } defer rows.Close() - var colTypes []ColumnType + var colTypes []models.ColumnType for rows.Next() { var column ColumnType @@ -162,7 +176,7 @@ ORDER BY c.ordinal_position; return colTypes, nil } -func MapMssqlColumn(column ColumnType) ColumnType { +func MapMssqlColumn(column ColumnType) models.ColumnType { stringTypes := map[string]bool{ "varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true, } @@ -195,10 +209,23 @@ func MapMssqlColumn(column ColumnType) ColumnType { column.unifiedType = GetUnifiedType(column.systemType) - return column + colType := models.NewColumnType( + column.name, + column.hasMaxLength, + column.hasPrecisionScale, + column.userType, + column.systemType, + column.unifiedType, + column.nullable, + column.maxLength, + column.precision, + column.scale, + ) + + return colType } -func GetColumnTypesMssql(db *sql.DB, tableInfo config.SourceTableInfo) ([]ColumnType, error) { +func GetColumnTypesMssql(db *sql.DB, tableInfo config.SourceTableInfo) ([]models.ColumnType, error) { query := ` SELECT c.name AS name, @@ -226,7 +253,7 @@ ORDER BY c.column_id; } defer rows.Close() - var colTypes []ColumnType + var colTypes []models.ColumnType for rows.Next() { var column ColumnType @@ -258,11 +285,11 @@ func GetColumnTypes( targetDb *pgxpool.Pool, sourceTable config.SourceTableInfo, targetTable config.TargetTableInfo, -) ([]ColumnType, []ColumnType, error) { +) ([]models.ColumnType, []models.ColumnType, error) { var sourceDbErr error var targetDbErr error - var sourceColTypes []ColumnType - var targetColTypes []ColumnType + var sourceColTypes []models.ColumnType + var targetColTypes []models.ColumnType var wg sync.WaitGroup wg.Go(func() { diff --git a/cmd/go_migrate/loader-error-handler.go b/cmd/go_migrate/loader-error-handler.go index 205fa73..bb40a4c 100644 --- a/cmd/go_migrate/loader-error-handler.go +++ b/cmd/go_migrate/loader-error-handler.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "sync" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" ) type LoaderError struct { - Chunk + models.Chunk Msg string } @@ -18,8 +21,8 @@ func (e *LoaderError) Error() string { func loaderErrorHandler( ctx context.Context, chErrorsIn <-chan LoaderError, - chChunksOut chan<- Chunk, - chJobErrorsOut chan<- JobError, + chChunksOut chan<- models.Chunk, + chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, ) { for { @@ -37,7 +40,7 @@ func loaderErrorHandler( } if err.RetryCounter >= maxRetryAttempts { - jobError := JobError{ + jobError := custom_errors.JobError{ ShouldCancelJob: false, Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts), Prev: &err, diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index 42e56af..1ded1fe 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -10,6 +10,8 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" @@ -21,16 +23,16 @@ func loadRowsPostgres( ctx context.Context, db *pgxpool.Pool, tableInfo config.TargetTableInfo, - columns []ColumnType, - chChunksIn <-chan Chunk, + columns []models.ColumnType, + chChunksIn <-chan models.Chunk, chErrorsOut chan<- LoaderError, - chJobErrorsOut chan<- JobError, + chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, rowsLoaded *int64, ) { tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} - colNames := Map(columns, func(col ColumnType) string { - return col.name + colNames := Map(columns, func(col models.ColumnType) string { + return col.Name() }) for { @@ -58,9 +60,9 @@ func loadChunkPostgres( db *pgxpool.Pool, identifier pgx.Identifier, colNames []string, - chunk Chunk, + chunk models.Chunk, chErrorsOut chan<- LoaderError, - chJobErrorsOut chan<- JobError, + chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, rowsLoaded *int64, ) (abort bool) { @@ -77,7 +79,7 @@ func loadChunkPostgres( if errors.As(err, &pgErr) { if pgErr.Code == "23505" { select { - case chJobErrorsOut <- JobError{ + case chJobErrorsOut <- custom_errors.JobError{ ShouldCancelJob: true, Msg: fmt.Sprintf("Fatal error in table %s", identifier.Sanitize()), Prev: err, diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index e274e4d..bb32c0f 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -8,6 +8,9 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" @@ -44,11 +47,11 @@ func processMigrationJob( log.Error("Unexpected error calculating batch ranges: ", err) } - chJobErrors := make(chan JobError, job.QueueSize) - chBatches := make(chan Batch, job.QueueSize) - chExtractorErrors := make(chan ExtractorError, job.QueueSize) - chChunksRaw := make(chan Chunk, job.QueueSize) - chChunksTransformed := make(chan Chunk, job.QueueSize) + chJobErrors := make(chan custom_errors.JobError, job.QueueSize) + chBatches := make(chan models.Batch, job.QueueSize) + chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize) + chChunksRaw := make(chan models.Chunk, job.QueueSize) + chChunksTransformed := make(chan models.Chunk, job.QueueSize) chLoadersErrors := make(chan LoaderError, job.QueueSize) var wgActiveBatches sync.WaitGroup @@ -58,21 +61,34 @@ func processMigrationJob( var wgLoaders sync.WaitGroup go func() { - if err := jobErrorHandler(jobCtx, chJobErrors); err != nil { + if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil { cancel() result.Error = err } }() - go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) + go custom_errors.ExtractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) maxExtractors := min(job.MaxExtractors, len(batches)) log.Infof("Starting %d extractor(s)...", maxExtractors) + exMssql := extractor.NewMssqlExtractor(sourceDb) + for range maxExtractors { wgExtractors.Go(func() { - extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, job.ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches, &rowsRead) + exMssql.Exec( + jobCtx, + job.SourceTable, + sourceColTypes, + job.ChunkSize, + chBatches, + chChunksRaw, + chExtractorErrors, + chJobErrors, + &wgActiveBatches, + &rowsRead, + ) }) } @@ -132,7 +148,7 @@ func processMigrationJob( return result } -func logColumnTypes(columnTypes []ColumnType, label string) { +func logColumnTypes(columnTypes []models.ColumnType, label string) { log.Debug(label) for _, col := range columnTypes { diff --git a/cmd/go_migrate/transformer.go b/cmd/go_migrate/transformer.go index 4107a0c..2ceba6f 100644 --- a/cmd/go_migrate/transformer.go +++ b/cmd/go_migrate/transformer.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" log "github.com/sirupsen/logrus" ) @@ -18,10 +20,10 @@ type columnTransformPlan struct { func transformRowsMssql( ctx context.Context, - columns []ColumnType, - chChunksIn <-chan Chunk, - chChunksOut chan<- Chunk, - chJobErrorsOut chan<- JobError, + columns []models.ColumnType, + chChunksIn <-chan models.Chunk, + chChunksOut chan<- models.Chunk, + chJobErrorsOut chan<- custom_errors.JobError, wgActiveChunks *sync.WaitGroup, ) { transformationPlan := computeTransformationPlan(columns) @@ -59,7 +61,7 @@ func transformRowsMssql( } select { - case chJobErrorsOut <- JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: + case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: case <-ctx.Done(): } return @@ -78,7 +80,7 @@ func transformRowsMssql( } } -func computeTransformationPlan(columns []ColumnType) []columnTransformPlan { +func computeTransformationPlan(columns []models.ColumnType) []columnTransformPlan { var plan []columnTransformPlan for i, col := range columns { @@ -123,7 +125,7 @@ func computeTransformationPlan(columns []ColumnType) []columnTransformPlan { const processChunkCtxCheck = 4096 -func processChunk(ctx context.Context, chunk *Chunk, transformationPlan []columnTransformPlan) error { +func processChunk(ctx context.Context, chunk *models.Chunk, transformationPlan []columnTransformPlan) error { for i, rowValues := range chunk.Data { if i%processChunkCtxCheck == 0 { if err := ctx.Err(); err != nil { diff --git a/internal/app/models/colum-type.go b/internal/app/models/colum-type.go index b0b7bbd..81e6a62 100644 --- a/internal/app/models/colum-type.go +++ b/internal/app/models/colum-type.go @@ -42,3 +42,29 @@ func (c *ColumnType) Nullable() bool { func (c *ColumnType) Type() string { return c.unifiedType } + +func NewColumnType( + name string, + hasMaxLength bool, + hasPrecisionScale bool, + userType string, + systemType string, + unifiedType string, + nullable bool, + maxLength int64, + precision int64, + scale int64, +) ColumnType { + return ColumnType{ + name, + hasMaxLength, + hasPrecisionScale, + userType, + systemType, + unifiedType, + nullable, + maxLength, + precision, + scale, + } +}