feat: refactor models to improve type handling and enhance error management across migration processes

This commit is contained in:
2026-04-10 19:27:27 -05:00
parent c2ea84bfcf
commit ca621352c9
7 changed files with 121 additions and 44 deletions

View File

@@ -7,6 +7,7 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
@@ -42,7 +43,7 @@ GROUP BY t.name`
return rowsCount, nil return rowsCount, nil
} }
func calculateBatchesMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, batchCount int64) ([]Batch, error) { func calculateBatchesMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, batchCount int64) ([]models.Batch, error) {
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
MIN([%s]) AS lower_limit, MIN([%s]) AS lower_limit,
@@ -67,10 +68,10 @@ ORDER BY batch_id`,
} }
defer rows.Close() defer rows.Close()
batches := make([]Batch, 0, batchCount) batches := make([]models.Batch, 0, batchCount)
for rows.Next() { for rows.Next() {
batch := Batch{ batch := models.Batch{
Id: uuid.New(), Id: uuid.New(),
ShouldUseRange: true, ShouldUseRange: true,
RetryCounter: 0, RetryCounter: 0,
@@ -91,7 +92,7 @@ ORDER BY batch_id`,
return batches, nil return batches, nil
} }
func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]Batch, error) { func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, rowsPerBatch int64) ([]models.Batch, error) {
rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -101,7 +102,7 @@ func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.Sourc
if rowsCount > rowsPerBatch { if rowsCount > rowsPerBatch {
batchCount = rowsCount / rowsPerBatch batchCount = rowsCount / rowsPerBatch
} else { } else {
return []Batch{{ return []models.Batch{{
Id: uuid.New(), Id: uuid.New(),
ShouldUseRange: false, ShouldUseRange: false,
RetryCounter: 0, RetryCounter: 0,

View File

@@ -10,6 +10,7 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb" _ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -67,7 +68,7 @@ func GetUnifiedType(systemType string) string {
return strings.ToUpper(systemType) return strings.ToUpper(systemType)
} }
func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, scale *int64) ColumnType { func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, scale *int64) models.ColumnType {
stringTypes := map[string]bool{ stringTypes := map[string]bool{
"varchar": true, "char": true, "character": true, "text": true, "character varying": true, "varchar": true, "char": true, "character": true, "text": true, "character varying": true,
} }
@@ -109,10 +110,23 @@ func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, sc
column.unifiedType = GetUnifiedType(column.systemType) column.unifiedType = GetUnifiedType(column.systemType)
return column colType := models.NewColumnType(
column.name,
column.hasMaxLength,
column.hasPrecisionScale,
column.userType,
column.systemType,
column.unifiedType,
column.nullable,
column.maxLength,
column.precision,
column.scale,
)
return colType
} }
func GetColumnTypesPostgres(db *pgxpool.Pool, tableInfo config.TargetTableInfo) ([]ColumnType, error) { func GetColumnTypesPostgres(db *pgxpool.Pool, tableInfo config.TargetTableInfo) ([]models.ColumnType, error) {
query := ` query := `
SELECT SELECT
c.column_name AS name, c.column_name AS name,
@@ -136,7 +150,7 @@ ORDER BY c.ordinal_position;
} }
defer rows.Close() defer rows.Close()
var colTypes []ColumnType var colTypes []models.ColumnType
for rows.Next() { for rows.Next() {
var column ColumnType var column ColumnType
@@ -162,7 +176,7 @@ ORDER BY c.ordinal_position;
return colTypes, nil return colTypes, nil
} }
func MapMssqlColumn(column ColumnType) ColumnType { func MapMssqlColumn(column ColumnType) models.ColumnType {
stringTypes := map[string]bool{ stringTypes := map[string]bool{
"varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true, "varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true,
} }
@@ -195,10 +209,23 @@ func MapMssqlColumn(column ColumnType) ColumnType {
column.unifiedType = GetUnifiedType(column.systemType) column.unifiedType = GetUnifiedType(column.systemType)
return column colType := models.NewColumnType(
column.name,
column.hasMaxLength,
column.hasPrecisionScale,
column.userType,
column.systemType,
column.unifiedType,
column.nullable,
column.maxLength,
column.precision,
column.scale,
)
return colType
} }
func GetColumnTypesMssql(db *sql.DB, tableInfo config.SourceTableInfo) ([]ColumnType, error) { func GetColumnTypesMssql(db *sql.DB, tableInfo config.SourceTableInfo) ([]models.ColumnType, error) {
query := ` query := `
SELECT SELECT
c.name AS name, c.name AS name,
@@ -226,7 +253,7 @@ ORDER BY c.column_id;
} }
defer rows.Close() defer rows.Close()
var colTypes []ColumnType var colTypes []models.ColumnType
for rows.Next() { for rows.Next() {
var column ColumnType var column ColumnType
@@ -258,11 +285,11 @@ func GetColumnTypes(
targetDb *pgxpool.Pool, targetDb *pgxpool.Pool,
sourceTable config.SourceTableInfo, sourceTable config.SourceTableInfo,
targetTable config.TargetTableInfo, targetTable config.TargetTableInfo,
) ([]ColumnType, []ColumnType, error) { ) ([]models.ColumnType, []models.ColumnType, error) {
var sourceDbErr error var sourceDbErr error
var targetDbErr error var targetDbErr error
var sourceColTypes []ColumnType var sourceColTypes []models.ColumnType
var targetColTypes []ColumnType var targetColTypes []models.ColumnType
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Go(func() { wg.Go(func() {

View File

@@ -4,10 +4,13 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
) )
type LoaderError struct { type LoaderError struct {
Chunk models.Chunk
Msg string Msg string
} }
@@ -18,8 +21,8 @@ func (e *LoaderError) Error() string {
func loaderErrorHandler( func loaderErrorHandler(
ctx context.Context, ctx context.Context,
chErrorsIn <-chan LoaderError, chErrorsIn <-chan LoaderError,
chChunksOut chan<- Chunk, chChunksOut chan<- models.Chunk,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup, wgActiveChunks *sync.WaitGroup,
) { ) {
for { for {
@@ -37,7 +40,7 @@ func loaderErrorHandler(
} }
if err.RetryCounter >= maxRetryAttempts { if err.RetryCounter >= maxRetryAttempts {
jobError := JobError{ jobError := custom_errors.JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts), Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err, Prev: &err,

View File

@@ -10,6 +10,8 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
@@ -21,16 +23,16 @@ func loadRowsPostgres(
ctx context.Context, ctx context.Context,
db *pgxpool.Pool, db *pgxpool.Pool,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []ColumnType, columns []models.ColumnType,
chChunksIn <-chan Chunk, chChunksIn <-chan models.Chunk,
chErrorsOut chan<- LoaderError, chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup, wgActiveChunks *sync.WaitGroup,
rowsLoaded *int64, rowsLoaded *int64,
) { ) {
tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table}
colNames := Map(columns, func(col ColumnType) string { colNames := Map(columns, func(col models.ColumnType) string {
return col.name return col.Name()
}) })
for { for {
@@ -58,9 +60,9 @@ func loadChunkPostgres(
db *pgxpool.Pool, db *pgxpool.Pool,
identifier pgx.Identifier, identifier pgx.Identifier,
colNames []string, colNames []string,
chunk Chunk, chunk models.Chunk,
chErrorsOut chan<- LoaderError, chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup, wgActiveChunks *sync.WaitGroup,
rowsLoaded *int64, rowsLoaded *int64,
) (abort bool) { ) (abort bool) {
@@ -77,7 +79,7 @@ func loadChunkPostgres(
if errors.As(err, &pgErr) { if errors.As(err, &pgErr) {
if pgErr.Code == "23505" { if pgErr.Code == "23505" {
select { select {
case chJobErrorsOut <- JobError{ case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s", identifier.Sanitize()), Msg: fmt.Sprintf("Fatal error in table %s", identifier.Sanitize()),
Prev: err, Prev: err,

View File

@@ -8,6 +8,9 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb" _ "github.com/microsoft/go-mssqldb"
@@ -44,11 +47,11 @@ func processMigrationJob(
log.Error("Unexpected error calculating batch ranges: ", err) log.Error("Unexpected error calculating batch ranges: ", err)
} }
chJobErrors := make(chan JobError, job.QueueSize) chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chBatches := make(chan Batch, job.QueueSize) chBatches := make(chan models.Batch, job.QueueSize)
chExtractorErrors := make(chan ExtractorError, job.QueueSize) chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
chChunksRaw := make(chan Chunk, job.QueueSize) chChunksRaw := make(chan models.Chunk, job.QueueSize)
chChunksTransformed := make(chan Chunk, job.QueueSize) chChunksTransformed := make(chan models.Chunk, job.QueueSize)
chLoadersErrors := make(chan LoaderError, job.QueueSize) chLoadersErrors := make(chan LoaderError, job.QueueSize)
var wgActiveBatches sync.WaitGroup var wgActiveBatches sync.WaitGroup
@@ -58,21 +61,34 @@ func processMigrationJob(
var wgLoaders sync.WaitGroup var wgLoaders sync.WaitGroup
go func() { go func() {
if err := jobErrorHandler(jobCtx, chJobErrors); err != nil { if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
cancel() cancel()
result.Error = err result.Error = err
} }
}() }()
go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) go custom_errors.ExtractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
maxExtractors := min(job.MaxExtractors, len(batches)) maxExtractors := min(job.MaxExtractors, len(batches))
log.Infof("Starting %d extractor(s)...", maxExtractors) log.Infof("Starting %d extractor(s)...", maxExtractors)
exMssql := extractor.NewMssqlExtractor(sourceDb)
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, job.ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches, &rowsRead) exMssql.Exec(
jobCtx,
job.SourceTable,
sourceColTypes,
job.ChunkSize,
chBatches,
chChunksRaw,
chExtractorErrors,
chJobErrors,
&wgActiveBatches,
&rowsRead,
)
}) })
} }
@@ -132,7 +148,7 @@ func processMigrationJob(
return result return result
} }
func logColumnTypes(columnTypes []ColumnType, label string) { func logColumnTypes(columnTypes []models.ColumnType, label string) {
log.Debug(label) log.Debug(label)
for _, col := range columnTypes { for _, col := range columnTypes {

View File

@@ -6,6 +6,8 @@ import (
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@@ -18,10 +20,10 @@ type columnTransformPlan struct {
func transformRowsMssql( func transformRowsMssql(
ctx context.Context, ctx context.Context,
columns []ColumnType, columns []models.ColumnType,
chChunksIn <-chan Chunk, chChunksIn <-chan models.Chunk,
chChunksOut chan<- Chunk, chChunksOut chan<- models.Chunk,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveChunks *sync.WaitGroup, wgActiveChunks *sync.WaitGroup,
) { ) {
transformationPlan := computeTransformationPlan(columns) transformationPlan := computeTransformationPlan(columns)
@@ -59,7 +61,7 @@ func transformRowsMssql(
} }
select { select {
case chJobErrorsOut <- JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}: case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done(): case <-ctx.Done():
} }
return return
@@ -78,7 +80,7 @@ func transformRowsMssql(
} }
} }
func computeTransformationPlan(columns []ColumnType) []columnTransformPlan { func computeTransformationPlan(columns []models.ColumnType) []columnTransformPlan {
var plan []columnTransformPlan var plan []columnTransformPlan
for i, col := range columns { for i, col := range columns {
@@ -123,7 +125,7 @@ func computeTransformationPlan(columns []ColumnType) []columnTransformPlan {
const processChunkCtxCheck = 4096 const processChunkCtxCheck = 4096
func processChunk(ctx context.Context, chunk *Chunk, transformationPlan []columnTransformPlan) error { func processChunk(ctx context.Context, chunk *models.Chunk, transformationPlan []columnTransformPlan) error {
for i, rowValues := range chunk.Data { for i, rowValues := range chunk.Data {
if i%processChunkCtxCheck == 0 { if i%processChunkCtxCheck == 0 {
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {

View File

@@ -42,3 +42,29 @@ func (c *ColumnType) Nullable() bool {
func (c *ColumnType) Type() string { func (c *ColumnType) Type() string {
return c.unifiedType return c.unifiedType
} }
func NewColumnType(
name string,
hasMaxLength bool,
hasPrecisionScale bool,
userType string,
systemType string,
unifiedType string,
nullable bool,
maxLength int64,
precision int64,
scale int64,
) ColumnType {
return ColumnType{
name,
hasMaxLength,
hasPrecisionScale,
userType,
systemType,
unifiedType,
nullable,
maxLength,
precision,
scale,
}
}