2 Commits

8 changed files with 26 additions and 64 deletions

View File

@@ -120,7 +120,7 @@ func processMigrationJobs(
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer() transformer := transformers.NewMssqlTransformer()
loader := loaders.NewPostgresLoader(targetDb) loader := loaders.NewGenericLoader(targetDb)
for i := range maxParallelWorkers { for i := range maxParallelWorkers {
wgJobs.Go(func() { wgJobs.Go(func() {

View File

@@ -1,6 +1,6 @@
max_parallel_workers: 4 max_parallel_workers: 4
source_db_type: sqlserver source_db_type: sqlserver
target_db_type: postgres target_db_type: sqlserver
defaults: defaults:
max_extractors: 2 max_extractors: 2

View File

@@ -43,9 +43,9 @@ func buildExtractQueryMssql(
for i, col := range columns { for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name()) fmt.Fprintf(&sbQuery, "[%s]", col.Name())
if col.Type() == "GEOMETRY" { // if col.Type() == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name()) // fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
} // }
if i < len(columns)-1 { if i < len(columns)-1 {
sbQuery.WriteString(", ") sbQuery.WriteString(", ")

View File

@@ -15,31 +15,21 @@ import (
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
) )
type PostgresLoader struct { type GenericLoader struct {
db dbwrapper.DbWrapper db dbwrapper.DbWrapper
} }
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader { func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: db} return &GenericLoader{db: db}
} }
func mapSlice[T any, V any](input []T, mapper func(T) V) []V { func (gl *GenericLoader) ProcessBatch(
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
_, err := postgresLd.db.SaveMassive( _, err := gl.db.SaveMassive(
ctx, ctx,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table, tableInfo.Table,
@@ -65,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
return len(batch.Rows), nil return len(batch.Rows), nil
} }
func (postgresLd *PostgresLoader) Exec( func (gl *GenericLoader) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -92,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
return return
} }
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch) processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil { if err != nil {
var ldError *custom_errors.LoaderError var ldError *custom_errors.LoaderError

View File

@@ -1 +0,0 @@
package loaders

View File

@@ -0,0 +1,11 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -39,6 +39,8 @@ JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%')) WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;` ORDER BY c.column_id;`
// AND c.name NOT LIKE '$%'
type rawColumnMssql struct { type rawColumnMssql struct {
name string name string
userType string userType string

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"sync" "sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
@@ -18,46 +17,7 @@ func NewMssqlTransformer() etl.Transformer {
} }
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan return []etl.ColumnTransformPlan{}
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
} }
const processBatchCtxCheck = 4096 const processBatchCtxCheck = 4096