Compare commits
2 Commits
refactor/e
...
846a49d40c
| Author | SHA1 | Date | |
|---|---|---|---|
|
846a49d40c
|
|||
|
93b302db8e
|
@@ -1,77 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
|
||||||
_ "github.com/microsoft/go-mssqldb"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
func connectToSqlServer() (*sql.DB, error) {
|
|
||||||
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if err := db.PingContext(ctx); err != nil {
|
|
||||||
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return db, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func connectToPostgres() (*pgxpool.Pool, error) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := pool.Ping(ctx); err != nil {
|
|
||||||
pool.Close()
|
|
||||||
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return pool, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
|
|
||||||
var sourceDbErr, targetDbErr error
|
|
||||||
var sourceDb *sql.DB
|
|
||||||
var targetDb *pgxpool.Pool
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
wg.Go(func() {
|
|
||||||
sourceDb, sourceDbErr = connectToSqlServer()
|
|
||||||
if sourceDbErr != nil {
|
|
||||||
log.Error("Unable to connect to source db: ", sourceDbErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
wg.Go(func() {
|
|
||||||
targetDb, targetDbErr = connectToPostgres()
|
|
||||||
if targetDbErr != nil {
|
|
||||||
log.Error("Unable to connect to target db: ", targetDbErr)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
if sourceDbErr != nil || targetDbErr != nil {
|
|
||||||
return nil, nil, errors.New("Unable to connect to databases")
|
|
||||||
}
|
|
||||||
|
|
||||||
return sourceDb, targetDb, nil
|
|
||||||
}
|
|
||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
@@ -95,10 +96,10 @@ func processMigrationJobs(
|
|||||||
targetDb dbwrapper.DbWrapper,
|
targetDb dbwrapper.DbWrapper,
|
||||||
jobs []config.Job,
|
jobs []config.Job,
|
||||||
maxParallelWorkers int,
|
maxParallelWorkers int,
|
||||||
) []JobResult {
|
) []models.JobResult {
|
||||||
if len(jobs) == 0 {
|
if len(jobs) == 0 {
|
||||||
log.Info("No migration jobs configured")
|
log.Info("No migration jobs configured")
|
||||||
return []JobResult{}
|
return []models.JobResult{}
|
||||||
}
|
}
|
||||||
|
|
||||||
if maxParallelWorkers <= 0 {
|
if maxParallelWorkers <= 0 {
|
||||||
@@ -111,7 +112,7 @@ func processMigrationJobs(
|
|||||||
|
|
||||||
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
|
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
|
||||||
|
|
||||||
chJobResults := make(chan JobResult, len(jobs))
|
chJobResults := make(chan models.JobResult, len(jobs))
|
||||||
chJobs := make(chan config.Job, len(jobs))
|
chJobs := make(chan config.Job, len(jobs))
|
||||||
var wgJobs sync.WaitGroup
|
var wgJobs sync.WaitGroup
|
||||||
|
|
||||||
@@ -119,7 +120,7 @@ func processMigrationJobs(
|
|||||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
||||||
extractor := extractors.NewMssqlExtractor(sourceDb)
|
extractor := extractors.NewMssqlExtractor(sourceDb)
|
||||||
transformer := transformers.NewMssqlTransformer()
|
transformer := transformers.NewMssqlTransformer()
|
||||||
loader := loaders.NewPostgresLoader(targetDb)
|
loader := loaders.NewGenericLoader(targetDb)
|
||||||
|
|
||||||
for i := range maxParallelWorkers {
|
for i := range maxParallelWorkers {
|
||||||
wgJobs.Go(func() {
|
wgJobs.Go(func() {
|
||||||
@@ -151,7 +152,7 @@ func processMigrationJobs(
|
|||||||
close(chJobResults)
|
close(chJobResults)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var finalResults []JobResult
|
var finalResults []models.JobResult
|
||||||
for res := range chJobResults {
|
for res := range chJobResults {
|
||||||
finalResults = append(finalResults, res)
|
finalResults = append(finalResults, res)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,13 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
|
|
||||||
type JobResult struct {
|
|
||||||
JobName string
|
|
||||||
StartTime time.Time
|
|
||||||
Duration time.Duration
|
|
||||||
RowsRead int64
|
|
||||||
RowsLoaded int64
|
|
||||||
RowsFailed int64
|
|
||||||
Error error
|
|
||||||
}
|
|
||||||
@@ -26,11 +26,11 @@ func processMigrationJob(
|
|||||||
transformer etl.Transformer,
|
transformer etl.Transformer,
|
||||||
loader etl.Loader,
|
loader etl.Loader,
|
||||||
job config.Job,
|
job config.Job,
|
||||||
) JobResult {
|
) models.JobResult {
|
||||||
localCtx, cancel := context.WithCancel(ctx)
|
localCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
result := JobResult{
|
result := models.JobResult{
|
||||||
JobName: job.Name,
|
JobName: job.Name,
|
||||||
StartTime: time.Now(),
|
StartTime: time.Now(),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,31 +15,21 @@ import (
|
|||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PostgresLoader struct {
|
type GenericLoader struct {
|
||||||
db dbwrapper.DbWrapper
|
db dbwrapper.DbWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
|
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
|
||||||
return &PostgresLoader{db: db}
|
return &GenericLoader{db: db}
|
||||||
}
|
}
|
||||||
|
|
||||||
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
|
func (gl *GenericLoader) ProcessBatch(
|
||||||
result := make([]V, len(input))
|
|
||||||
|
|
||||||
for i, v := range input {
|
|
||||||
result[i] = mapper(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func (postgresLd *PostgresLoader) ProcessBatch(
|
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.TargetTableInfo,
|
tableInfo config.TargetTableInfo,
|
||||||
colNames []string,
|
colNames []string,
|
||||||
batch models.Batch,
|
batch models.Batch,
|
||||||
) (int, error) {
|
) (int, error) {
|
||||||
_, err := postgresLd.db.SaveMassive(
|
_, err := gl.db.SaveMassive(
|
||||||
ctx,
|
ctx,
|
||||||
tableInfo.Schema,
|
tableInfo.Schema,
|
||||||
tableInfo.Table,
|
tableInfo.Table,
|
||||||
@@ -65,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
|
|||||||
return len(batch.Rows), nil
|
return len(batch.Rows), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (postgresLd *PostgresLoader) Exec(
|
func (gl *GenericLoader) Exec(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.TargetTableInfo,
|
tableInfo config.TargetTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
@@ -92,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
|
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var ldError *custom_errors.LoaderError
|
var ldError *custom_errors.LoaderError
|
||||||
@@ -1 +0,0 @@
|
|||||||
package loaders
|
|
||||||
11
internal/app/etl/loaders/utils.go
Normal file
11
internal/app/etl/loaders/utils.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package loaders
|
||||||
|
|
||||||
|
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
|
||||||
|
result := make([]V, len(input))
|
||||||
|
|
||||||
|
for i, v := range input {
|
||||||
|
result[i] = mapper(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
@@ -1,6 +1,10 @@
|
|||||||
package models
|
package models
|
||||||
|
|
||||||
import "github.com/google/uuid"
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
type UnknownRowValues = []any
|
type UnknownRowValues = []any
|
||||||
|
|
||||||
@@ -25,3 +29,13 @@ type Partition struct {
|
|||||||
HasRange bool
|
HasRange bool
|
||||||
RetryCounter int
|
RetryCounter int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type JobResult struct {
|
||||||
|
JobName string
|
||||||
|
StartTime time.Time
|
||||||
|
Duration time.Duration
|
||||||
|
RowsRead int64
|
||||||
|
RowsLoaded int64
|
||||||
|
RowsFailed int64
|
||||||
|
Error error
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user