2 Commits

8 changed files with 41 additions and 116 deletions

View File

@@ -1,77 +0,0 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func connectToSqlServer() (*sql.DB, error) {
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
}
return db, nil
}
func connectToPostgres() (*pgxpool.Pool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
}
return pool, nil
}
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
var sourceDbErr, targetDbErr error
var sourceDb *sql.DB
var targetDb *pgxpool.Pool
var wg sync.WaitGroup
wg.Go(func() {
sourceDb, sourceDbErr = connectToSqlServer()
if sourceDbErr != nil {
log.Error("Unable to connect to source db: ", sourceDbErr)
}
})
wg.Go(func() {
targetDb, targetDbErr = connectToPostgres()
if targetDbErr != nil {
log.Error("Unable to connect to target db: ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Unable to connect to databases")
}
return sourceDb, targetDb, nil
}

View File

@@ -11,6 +11,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@@ -95,10 +96,10 @@ func processMigrationJobs(
targetDb dbwrapper.DbWrapper, targetDb dbwrapper.DbWrapper,
jobs []config.Job, jobs []config.Job,
maxParallelWorkers int, maxParallelWorkers int,
) []JobResult { ) []models.JobResult {
if len(jobs) == 0 { if len(jobs) == 0 {
log.Info("No migration jobs configured") log.Info("No migration jobs configured")
return []JobResult{} return []models.JobResult{}
} }
if maxParallelWorkers <= 0 { if maxParallelWorkers <= 0 {
@@ -111,7 +112,7 @@ func processMigrationJobs(
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan JobResult, len(jobs)) chJobResults := make(chan models.JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs)) chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup var wgJobs sync.WaitGroup
@@ -119,7 +120,7 @@ func processMigrationJobs(
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer() transformer := transformers.NewMssqlTransformer()
loader := loaders.NewPostgresLoader(targetDb) loader := loaders.NewGenericLoader(targetDb)
for i := range maxParallelWorkers { for i := range maxParallelWorkers {
wgJobs.Go(func() { wgJobs.Go(func() {
@@ -151,7 +152,7 @@ func processMigrationJobs(
close(chJobResults) close(chJobResults)
}() }()
var finalResults []JobResult var finalResults []models.JobResult
for res := range chJobResults { for res := range chJobResults {
finalResults = append(finalResults, res) finalResults = append(finalResults, res)
} }

View File

@@ -1,13 +0,0 @@
package main
import "time"
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -26,11 +26,11 @@ func processMigrationJob(
transformer etl.Transformer, transformer etl.Transformer,
loader etl.Loader, loader etl.Loader,
job config.Job, job config.Job,
) JobResult { ) models.JobResult {
localCtx, cancel := context.WithCancel(ctx) localCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
result := JobResult{ result := models.JobResult{
JobName: job.Name, JobName: job.Name,
StartTime: time.Now(), StartTime: time.Now(),
} }

View File

@@ -15,31 +15,21 @@ import (
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
) )
type PostgresLoader struct { type GenericLoader struct {
db dbwrapper.DbWrapper db dbwrapper.DbWrapper
} }
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader { func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: db} return &GenericLoader{db: db}
} }
func mapSlice[T any, V any](input []T, mapper func(T) V) []V { func (gl *GenericLoader) ProcessBatch(
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
_, err := postgresLd.db.SaveMassive( _, err := gl.db.SaveMassive(
ctx, ctx,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table, tableInfo.Table,
@@ -65,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
return len(batch.Rows), nil return len(batch.Rows), nil
} }
func (postgresLd *PostgresLoader) Exec( func (gl *GenericLoader) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -92,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
return return
} }
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch) processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil { if err != nil {
var ldError *custom_errors.LoaderError var ldError *custom_errors.LoaderError

View File

@@ -1 +0,0 @@
package loaders

View File

@@ -0,0 +1,11 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -1,6 +1,10 @@
package models package models
import "github.com/google/uuid" import (
"time"
"github.com/google/uuid"
)
type UnknownRowValues = []any type UnknownRowValues = []any
@@ -25,3 +29,13 @@ type Partition struct {
HasRange bool HasRange bool
RetryCounter int RetryCounter int
} }
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}