1 Commits

Author SHA1 Message Date
DiegoAlessandroMotta
1c3db39b21 update extractor interface 2026-04-16 23:49:23 -05:00
14 changed files with 239 additions and 70 deletions

77
cmd/go_migrate/connect.go Normal file
View File

@@ -0,0 +1,77 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func connectToSqlServer() (*sql.DB, error) {
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
}
return db, nil
}
func connectToPostgres() (*pgxpool.Pool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
}
return pool, nil
}
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
var sourceDbErr, targetDbErr error
var sourceDb *sql.DB
var targetDb *pgxpool.Pool
var wg sync.WaitGroup
wg.Go(func() {
sourceDb, sourceDbErr = connectToSqlServer()
if sourceDbErr != nil {
log.Error("Unable to connect to source db: ", sourceDbErr)
}
})
wg.Go(func() {
targetDb, targetDbErr = connectToPostgres()
if targetDbErr != nil {
log.Error("Unable to connect to target db: ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Unable to connect to databases")
}
return sourceDb, targetDb, nil
}

View File

@@ -11,7 +11,6 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
@@ -96,10 +95,10 @@ func processMigrationJobs(
targetDb dbwrapper.DbWrapper,
jobs []config.Job,
maxParallelWorkers int,
) []models.JobResult {
) []JobResult {
if len(jobs) == 0 {
log.Info("No migration jobs configured")
return []models.JobResult{}
return []JobResult{}
}
if maxParallelWorkers <= 0 {
@@ -112,7 +111,7 @@ func processMigrationJobs(
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan models.JobResult, len(jobs))
chJobResults := make(chan JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup
@@ -120,7 +119,7 @@ func processMigrationJobs(
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer()
loader := loaders.NewGenericLoader(targetDb)
loader := loaders.NewPostgresLoader(targetDb)
for i := range maxParallelWorkers {
wgJobs.Go(func() {
@@ -152,7 +151,7 @@ func processMigrationJobs(
close(chJobResults)
}()
var finalResults []models.JobResult
var finalResults []JobResult
for res := range chJobResults {
finalResults = append(finalResults, res)
}

13
cmd/go_migrate/metrics.go Normal file
View File

@@ -0,0 +1,13 @@
package main
import "time"
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -26,11 +26,11 @@ func processMigrationJob(
transformer etl.Transformer,
loader etl.Loader,
job config.Job,
) models.JobResult {
) JobResult {
localCtx, cancel := context.WithCancel(ctx)
defer cancel()
result := models.JobResult{
result := JobResult{
JobName: job.Name,
StartTime: time.Now(),
}
@@ -129,7 +129,7 @@ func processMigrationJob(
for range maxExtractors {
wgExtractors.Go(func() {
extractor.Exec(
extractor.Consume(
localCtx,
job.SourceTable,
sourceColTypes,

View File

@@ -1,6 +1,6 @@
max_parallel_workers: 4
source_db_type: sqlserver
target_db_type: sqlserver
target_db_type: postgres
defaults:
max_extractors: 2

View File

@@ -9,6 +9,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
@@ -43,9 +44,9 @@ func buildExtractQueryMssql(
for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
// if col.Type() == "GEOMETRY" {
// fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
// }
if col.Type() == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
}
if i < len(columns)-1 {
sbQuery.WriteString(", ")
@@ -99,7 +100,7 @@ func errorFromLastRow(
}
}
func (mssqlEx *MssqlExtractor) ProcessPartition(
func (mssqlEx *MssqlExtractor) Extract(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
@@ -107,7 +108,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
) (int64, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
var queryArgs []any
@@ -118,7 +119,7 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
)
}
rowsRead := 0
var rowsRead int64 = 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
@@ -189,14 +190,67 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
return rowsRead, nil
}
func (mssqlEx *MssqlExtractor) Exec(
func (mssqlEx *MssqlExtractor) ExtractWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
var fatalErr error
delay := time.Duration(time.Second * 1)
currentParitition := partition
for fatalErr != nil || currentParitition.RetryCounter < 3 {
currentParitition.RetryCounter++
rowsRead, err := mssqlEx.Extract(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
if rowsRead > 0 {
totalRowsRead += int64(rowsRead)
}
if err != nil {
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
} else {
fatalErr = err
}
continue
}
break
}
return totalRowsRead, fatalErr
}
func (mssqlEx *MssqlExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
@@ -231,7 +285,7 @@ func (mssqlEx *MssqlExtractor) Exec(
return
}
rowsReadResult, err := mssqlEx.ProcessPartition(
rowsReadResult, err := mssqlEx.ExtractWithRetries(
ctx,
tableInfo,
columns,
@@ -246,15 +300,8 @@ func (mssqlEx *MssqlExtractor) Exec(
}
if err != nil {
var exError *custom_errors.ExtractorError
var jobError *custom_errors.JobError
if errors.As(err, &exError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if errors.As(err, &jobError) {
if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
@@ -264,7 +311,7 @@ func (mssqlEx *MssqlExtractor) Exec(
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}

View File

@@ -51,7 +51,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
}
func (postgresEx *PostgresExtractor) ProcessPartition(
func (postgresEx *PostgresExtractor) Extract(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
@@ -110,7 +110,7 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
return rowsRead, nil
}
func (postgresEx *PostgresExtractor) Exec(
func (postgresEx *PostgresExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,

View File

@@ -15,21 +15,31 @@ import (
"github.com/jackc/pgx/v5/pgconn"
)
type GenericLoader struct {
type PostgresLoader struct {
db dbwrapper.DbWrapper
}
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &GenericLoader{db: db}
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: db}
}
func (gl *GenericLoader) ProcessBatch(
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error) {
_, err := gl.db.SaveMassive(
_, err := postgresLd.db.SaveMassive(
ctx,
tableInfo.Schema,
tableInfo.Table,
@@ -55,7 +65,7 @@ func (gl *GenericLoader) ProcessBatch(
return len(batch.Rows), nil
}
func (gl *GenericLoader) Exec(
func (postgresLd *PostgresLoader) Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
@@ -82,7 +92,7 @@ func (gl *GenericLoader) Exec(
return
}
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil {
var ldError *custom_errors.LoaderError

View File

@@ -0,0 +1 @@
package loaders

View File

@@ -1,11 +0,0 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -39,8 +39,6 @@ JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;`
// AND c.name NOT LIKE '$%'
type rawColumnMssql struct {
name string
userType string

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
@@ -17,7 +18,46 @@ func NewMssqlTransformer() etl.Transformer {
}
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
return []etl.ColumnTransformPlan{}
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
const processBatchCtxCheck = 4096

View File

@@ -10,7 +10,7 @@ import (
)
type Extractor interface {
ProcessPartition(
Extract(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
@@ -18,16 +18,25 @@ type Extractor interface {
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error)
) (int64, error)
Exec(
ExtractWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error)
Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,

View File

@@ -1,10 +1,6 @@
package models
import (
"time"
"github.com/google/uuid"
)
import "github.com/google/uuid"
type UnknownRowValues = []any
@@ -29,13 +25,3 @@ type Partition struct {
HasRange bool
RetryCounter int
}
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}