feat: refactor job result handling and remove unused files
This commit is contained in:
@@ -1,77 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
_ "github.com/microsoft/go-mssqldb"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func connectToSqlServer() (*sql.DB, error) {
|
||||
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func connectToPostgres() (*pgxpool.Pool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
|
||||
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
|
||||
}
|
||||
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
}
|
||||
|
||||
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
|
||||
var sourceDbErr, targetDbErr error
|
||||
var sourceDb *sql.DB
|
||||
var targetDb *pgxpool.Pool
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Go(func() {
|
||||
sourceDb, sourceDbErr = connectToSqlServer()
|
||||
if sourceDbErr != nil {
|
||||
log.Error("Unable to connect to source db: ", sourceDbErr)
|
||||
}
|
||||
})
|
||||
|
||||
wg.Go(func() {
|
||||
targetDb, targetDbErr = connectToPostgres()
|
||||
if targetDbErr != nil {
|
||||
log.Error("Unable to connect to target db: ", targetDbErr)
|
||||
}
|
||||
})
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if sourceDbErr != nil || targetDbErr != nil {
|
||||
return nil, nil, errors.New("Unable to connect to databases")
|
||||
}
|
||||
|
||||
return sourceDb, targetDb, nil
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
@@ -95,10 +96,10 @@ func processMigrationJobs(
|
||||
targetDb dbwrapper.DbWrapper,
|
||||
jobs []config.Job,
|
||||
maxParallelWorkers int,
|
||||
) []JobResult {
|
||||
) []models.JobResult {
|
||||
if len(jobs) == 0 {
|
||||
log.Info("No migration jobs configured")
|
||||
return []JobResult{}
|
||||
return []models.JobResult{}
|
||||
}
|
||||
|
||||
if maxParallelWorkers <= 0 {
|
||||
@@ -111,7 +112,7 @@ func processMigrationJobs(
|
||||
|
||||
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
|
||||
|
||||
chJobResults := make(chan JobResult, len(jobs))
|
||||
chJobResults := make(chan models.JobResult, len(jobs))
|
||||
chJobs := make(chan config.Job, len(jobs))
|
||||
var wgJobs sync.WaitGroup
|
||||
|
||||
@@ -151,7 +152,7 @@ func processMigrationJobs(
|
||||
close(chJobResults)
|
||||
}()
|
||||
|
||||
var finalResults []JobResult
|
||||
var finalResults []models.JobResult
|
||||
for res := range chJobResults {
|
||||
finalResults = append(finalResults, res)
|
||||
}
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
package main
|
||||
|
||||
import "time"
|
||||
|
||||
type JobResult struct {
|
||||
JobName string
|
||||
StartTime time.Time
|
||||
Duration time.Duration
|
||||
RowsRead int64
|
||||
RowsLoaded int64
|
||||
RowsFailed int64
|
||||
Error error
|
||||
}
|
||||
@@ -26,11 +26,11 @@ func processMigrationJob(
|
||||
transformer etl.Transformer,
|
||||
loader etl.Loader,
|
||||
job config.Job,
|
||||
) JobResult {
|
||||
) models.JobResult {
|
||||
localCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
result := JobResult{
|
||||
result := models.JobResult{
|
||||
JobName: job.Name,
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user