From 93b302db8ea88b2ad700b2f1f30dc56d7ed7e378 Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Thu, 16 Apr 2026 16:47:35 -0500 Subject: [PATCH] feat: refactor job result handling and remove unused files --- cmd/go_migrate/connect.go | 77 ------------------------------------- cmd/go_migrate/main.go | 9 +++-- cmd/go_migrate/metrics.go | 13 ------- cmd/go_migrate/process.go | 4 +- internal/app/models/main.go | 16 +++++++- 5 files changed, 22 insertions(+), 97 deletions(-) delete mode 100644 cmd/go_migrate/connect.go delete mode 100644 cmd/go_migrate/metrics.go diff --git a/cmd/go_migrate/connect.go b/cmd/go_migrate/connect.go deleted file mode 100644 index 40e417f..0000000 --- a/cmd/go_migrate/connect.go +++ /dev/null @@ -1,77 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "errors" - "fmt" - "sync" - "time" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "github.com/jackc/pgx/v5/pgxpool" - _ "github.com/microsoft/go-mssqldb" - log "github.com/sirupsen/logrus" -) - -func connectToSqlServer() (*sql.DB, error) { - db, err := sql.Open("sqlserver", config.App.SourceDbUrl) - if err != nil { - return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - if err := db.PingContext(ctx); err != nil { - return nil, fmt.Errorf("Unable to ping sqlserver: %w", err) - } - - return db, nil -} - -func connectToPostgres() (*pgxpool.Pool, error) { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - pool, err := pgxpool.New(ctx, config.App.TargetDbUrl) - if err != nil { - return nil, fmt.Errorf("Unable to connect to postgres: %w", err) - } - - if err := pool.Ping(ctx); err != nil { - pool.Close() - return nil, fmt.Errorf("Unable to ping postgres: %w", err) - } - - return pool, nil -} - -func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) { - var sourceDbErr, targetDbErr error - var sourceDb *sql.DB - var targetDb *pgxpool.Pool - var wg sync.WaitGroup - - wg.Go(func() { - sourceDb, sourceDbErr = connectToSqlServer() - if sourceDbErr != nil { - log.Error("Unable to connect to source db: ", sourceDbErr) - } - }) - - wg.Go(func() { - targetDb, targetDbErr = connectToPostgres() - if targetDbErr != nil { - log.Error("Unable to connect to target db: ", targetDbErr) - } - }) - - wg.Wait() - - if sourceDbErr != nil || targetDbErr != nil { - return nil, nil, errors.New("Unable to connect to databases") - } - - return sourceDb, targetDb, nil -} diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index e57fe3f..c322f84 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -11,6 +11,7 @@ import ( "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" ) @@ -95,10 +96,10 @@ func processMigrationJobs( targetDb dbwrapper.DbWrapper, jobs []config.Job, maxParallelWorkers int, -) []JobResult { +) []models.JobResult { if len(jobs) == 0 { log.Info("No migration jobs configured") - return []JobResult{} + return []models.JobResult{} } if maxParallelWorkers <= 0 { @@ -111,7 +112,7 @@ func processMigrationJobs( log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) - chJobResults := make(chan JobResult, len(jobs)) + chJobResults := make(chan models.JobResult, len(jobs)) chJobs := make(chan config.Job, len(jobs)) var wgJobs sync.WaitGroup @@ -151,7 +152,7 @@ func processMigrationJobs( close(chJobResults) }() - var finalResults []JobResult + var finalResults []models.JobResult for res := range chJobResults { finalResults = append(finalResults, res) } diff --git a/cmd/go_migrate/metrics.go b/cmd/go_migrate/metrics.go deleted file mode 100644 index b540c8c..0000000 --- a/cmd/go_migrate/metrics.go +++ /dev/null @@ -1,13 +0,0 @@ -package main - -import "time" - -type JobResult struct { - JobName string - StartTime time.Time - Duration time.Duration - RowsRead int64 - RowsLoaded int64 - RowsFailed int64 - Error error -} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index e525299..002d0b8 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -26,11 +26,11 @@ func processMigrationJob( transformer etl.Transformer, loader etl.Loader, job config.Job, -) JobResult { +) models.JobResult { localCtx, cancel := context.WithCancel(ctx) defer cancel() - result := JobResult{ + result := models.JobResult{ JobName: job.Name, StartTime: time.Now(), } diff --git a/internal/app/models/main.go b/internal/app/models/main.go index 6156a86..60eb73e 100644 --- a/internal/app/models/main.go +++ b/internal/app/models/main.go @@ -1,6 +1,10 @@ package models -import "github.com/google/uuid" +import ( + "time" + + "github.com/google/uuid" +) type UnknownRowValues = []any @@ -25,3 +29,13 @@ type Partition struct { HasRange bool RetryCounter int } + +type JobResult struct { + JobName string + StartTime time.Time + Duration time.Duration + RowsRead int64 + RowsLoaded int64 + RowsFailed int64 + Error error +}