From 33c9cd9c3e005eee81649839d5885ed3e7798d14 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Mon, 13 Apr 2026 07:57:18 -0500 Subject: [PATCH] feat: implement database wrapper interfaces for MSSQL and Postgres; enhance migration job processing with pre and post SQL execution --- cmd/go_migrate/log.go | 2 +- cmd/go_migrate/main.go | 7 ++++++- cmd/go_migrate/process.go | 39 ++++++++++++++++++++++++++----------- config.yaml | 6 ++++++ internal/app/db/mssql.go | 30 ++++++++++++++++++++++++++++ internal/app/db/postgres.go | 19 ++++++++++++++++++ internal/app/db/types.go | 11 +++++++++++ 7 files changed, 101 insertions(+), 13 deletions(-) create mode 100644 internal/app/db/mssql.go create mode 100644 internal/app/db/types.go diff --git a/cmd/go_migrate/log.go b/cmd/go_migrate/log.go index a7bad26..a7afee3 100644 --- a/cmd/go_migrate/log.go +++ b/cmd/go_migrate/log.go @@ -13,5 +13,5 @@ func configureLog() { DisableSorting: false, PadLevelText: true, }) - log.SetLevel(log.InfoLevel) + log.SetLevel(log.DebugLevel) } diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 5e399fd..3313d99 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -7,6 +7,7 @@ import ( "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" @@ -49,8 +50,10 @@ func main() { status := "OK" if res.Error != nil { status = "FAILED" + log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v | Error: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration, res.Error) + } else { + log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration) } - log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration) totalProcessed += res.RowsLoaded if res.Error != nil { @@ -91,6 +94,7 @@ func processMigrationJobs( chJobs := make(chan config.Job, len(jobs)) var wgJobs sync.WaitGroup + targetDbWrapper := db.NewPostgresDbWrapper(targetDb) sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) extractor := extractors.NewMssqlExtractor(sourceDb) @@ -103,6 +107,7 @@ func processMigrationJobs( log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) res := processMigrationJob( ctx, + targetDbWrapper, sourceTableAnalyzer, targetTableAnalyzer, extractor, diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index d159dc5..0886ca2 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -8,6 +8,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" @@ -17,6 +18,8 @@ import ( func processMigrationJob( ctx context.Context, + // sourceDbWrapper db.DbWrapper, + targetDbWrapper db.DbWrapper, sourceTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer, extractor etl.Extractor, @@ -24,7 +27,7 @@ func processMigrationJob( loader etl.Loader, job config.Job, ) JobResult { - jobCtx, cancel := context.WithCancel(ctx) + localCtx, cancel := context.WithCancel(ctx) defer cancel() result := JobResult{ @@ -39,7 +42,7 @@ func processMigrationJob( wgQueryColumnTypes.Go(func() error { var err error - sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(jobCtx, job.SourceTable.TableInfo) + sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo) if err != nil { return err } @@ -49,7 +52,7 @@ func processMigrationJob( wgQueryColumnTypes.Go(func() error { var err error - targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(jobCtx, job.TargetTable.TableInfo) + targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo) if err != nil { return err } @@ -63,8 +66,15 @@ func processMigrationJob( return result } + for _, query := range job.PreSQL { + if _, err := targetDbWrapper.Exec(localCtx, query); err != nil { + result.Error = err + return result + } + } + partitions, err := table_analyzers.PartitionRangeGenerator( - jobCtx, + localCtx, sourceTableAnalyzer, job.SourceTable.TableInfo, job.SourceTable.PrimaryKey, @@ -88,7 +98,7 @@ func processMigrationJob( var wgLoaders sync.WaitGroup go func() { - if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil { + if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil { log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err) cancel() result.Error = err @@ -96,7 +106,7 @@ func processMigrationJob( }() go custom_errors.ExtractorErrorHandler( - jobCtx, + localCtx, job.Retry, job.MaxPartitionErrrors, chExtractorErrors, @@ -105,7 +115,7 @@ func processMigrationJob( &wgActivePartitions, ) go custom_errors.LoaderErrorHandler( - jobCtx, + localCtx, job.Retry, job.MaxChunkErrors, chLoadersErrors, @@ -120,7 +130,7 @@ func processMigrationJob( for range maxExtractors { wgExtractors.Go(func() { extractor.Exec( - jobCtx, + localCtx, job.SourceTable, sourceColTypes, job.BatchSize, @@ -146,7 +156,7 @@ func processMigrationJob( for range maxExtractors { wgTransformers.Go(func() { transformer.Exec( - jobCtx, + localCtx, sourceColTypes, chBatchesRaw, chBatchesTransformed, @@ -161,7 +171,7 @@ func processMigrationJob( for range job.MaxLoaders { wgLoaders.Go(func() { loader.Exec( - jobCtx, + localCtx, job.TargetTable, targetColTypes, chBatchesTransformed, @@ -204,8 +214,15 @@ func processMigrationJob( cancel() }() + for _, query := range job.PostSQL { + if _, err := targetDbWrapper.Exec(localCtx, query); err != nil { + result.Error = err + return result + } + } + log.Debugf("waiting for local context to be done (%v)", job.Name) - <-jobCtx.Done() + <-localCtx.Done() log.Debugf("local context done (%v)", job.Name) if ctx.Err() != nil { diff --git a/config.yaml b/config.yaml index cad138e..b999242 100644 --- a/config.yaml +++ b/config.yaml @@ -28,6 +28,8 @@ jobs: target: schema: Cartografia table: MANZANA + pre_sql: + - 'SELECT 1' - name: red_puerto enabled: true @@ -38,3 +40,7 @@ jobs: target: schema: Red table: PUERTO + pre_sql: + - 'SELECT 1' + post_sql: + - "SELECT 1" diff --git a/internal/app/db/mssql.go b/internal/app/db/mssql.go new file mode 100644 index 0000000..2314c65 --- /dev/null +++ b/internal/app/db/mssql.go @@ -0,0 +1,30 @@ +package db + +import ( + "context" + "database/sql" +) + +type MssqlDbWrapper struct { + db *sql.DB +} + +func NewMssqlDbWrapper(db *sql.DB) DbWrapper { + return &MssqlDbWrapper{db: db} +} + +func (wrapper *MssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) { + result, execErr := wrapper.db.ExecContext(ctx, query, args...) + if execErr != nil { + return DbWrapperResult{}, execErr + } + + affectedRows, err := result.RowsAffected() + if err != nil { + return DbWrapperResult{}, err + } + + return DbWrapperResult{ + AffectedRows: affectedRows, + }, nil +} diff --git a/internal/app/db/postgres.go b/internal/app/db/postgres.go index d675916..81f3c58 100644 --- a/internal/app/db/postgres.go +++ b/internal/app/db/postgres.go @@ -26,3 +26,22 @@ func Close(pool *pgxpool.Pool) { pool.Close() } } + +type PostgresDbWrapper struct { + db *pgxpool.Pool +} + +func NewPostgresDbWrapper(db *pgxpool.Pool) DbWrapper { + return &PostgresDbWrapper{db: db} +} + +func (wrapper *PostgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) { + result, err := wrapper.db.Exec(ctx, query, args...) + if err != nil { + return DbWrapperResult{}, err + } + + return DbWrapperResult{ + AffectedRows: result.RowsAffected(), + }, nil +} diff --git a/internal/app/db/types.go b/internal/app/db/types.go new file mode 100644 index 0000000..ea72117 --- /dev/null +++ b/internal/app/db/types.go @@ -0,0 +1,11 @@ +package db + +import "context" + +type DbWrapperResult struct { + AffectedRows int64 +} + +type DbWrapper interface { + Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) +}