Compare commits
2 Commits
f65e67e02f
...
feat/mssql
| Author | SHA1 | Date | |
|---|---|---|---|
|
63cf26e1ab
|
|||
|
846a49d40c
|
@@ -120,7 +120,7 @@ func processMigrationJobs(
|
|||||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
||||||
extractor := extractors.NewMssqlExtractor(sourceDb)
|
extractor := extractors.NewMssqlExtractor(sourceDb)
|
||||||
transformer := transformers.NewMssqlTransformer()
|
transformer := transformers.NewMssqlTransformer()
|
||||||
loader := loaders.NewPostgresLoader(targetDb)
|
loader := loaders.NewGenericLoader(targetDb)
|
||||||
|
|
||||||
for i := range maxParallelWorkers {
|
for i := range maxParallelWorkers {
|
||||||
wgJobs.Go(func() {
|
wgJobs.Go(func() {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
max_parallel_workers: 4
|
max_parallel_workers: 4
|
||||||
source_db_type: sqlserver
|
source_db_type: sqlserver
|
||||||
target_db_type: postgres
|
target_db_type: sqlserver
|
||||||
|
|
||||||
defaults:
|
defaults:
|
||||||
max_extractors: 2
|
max_extractors: 2
|
||||||
|
|||||||
@@ -43,9 +43,9 @@ func buildExtractQueryMssql(
|
|||||||
for i, col := range columns {
|
for i, col := range columns {
|
||||||
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
|
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
|
||||||
|
|
||||||
if col.Type() == "GEOMETRY" {
|
// if col.Type() == "GEOMETRY" {
|
||||||
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
|
// fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
|
||||||
}
|
// }
|
||||||
|
|
||||||
if i < len(columns)-1 {
|
if i < len(columns)-1 {
|
||||||
sbQuery.WriteString(", ")
|
sbQuery.WriteString(", ")
|
||||||
|
|||||||
@@ -15,31 +15,21 @@ import (
|
|||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PostgresLoader struct {
|
type GenericLoader struct {
|
||||||
db dbwrapper.DbWrapper
|
db dbwrapper.DbWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
|
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
|
||||||
return &PostgresLoader{db: db}
|
return &GenericLoader{db: db}
|
||||||
}
|
}
|
||||||
|
|
||||||
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
|
func (gl *GenericLoader) ProcessBatch(
|
||||||
result := make([]V, len(input))
|
|
||||||
|
|
||||||
for i, v := range input {
|
|
||||||
result[i] = mapper(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func (postgresLd *PostgresLoader) ProcessBatch(
|
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.TargetTableInfo,
|
tableInfo config.TargetTableInfo,
|
||||||
colNames []string,
|
colNames []string,
|
||||||
batch models.Batch,
|
batch models.Batch,
|
||||||
) (int, error) {
|
) (int, error) {
|
||||||
_, err := postgresLd.db.SaveMassive(
|
_, err := gl.db.SaveMassive(
|
||||||
ctx,
|
ctx,
|
||||||
tableInfo.Schema,
|
tableInfo.Schema,
|
||||||
tableInfo.Table,
|
tableInfo.Table,
|
||||||
@@ -65,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
|
|||||||
return len(batch.Rows), nil
|
return len(batch.Rows), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (postgresLd *PostgresLoader) Exec(
|
func (gl *GenericLoader) Exec(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.TargetTableInfo,
|
tableInfo config.TargetTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
@@ -92,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
|
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var ldError *custom_errors.LoaderError
|
var ldError *custom_errors.LoaderError
|
||||||
@@ -1 +0,0 @@
|
|||||||
package loaders
|
|
||||||
11
internal/app/etl/loaders/utils.go
Normal file
11
internal/app/etl/loaders/utils.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package loaders
|
||||||
|
|
||||||
|
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
|
||||||
|
result := make([]V, len(input))
|
||||||
|
|
||||||
|
for i, v := range input {
|
||||||
|
result[i] = mapper(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
@@ -39,6 +39,8 @@ JOIN sys.schemas s ON st.schema_id = s.schema_id
|
|||||||
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
|
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
|
||||||
ORDER BY c.column_id;`
|
ORDER BY c.column_id;`
|
||||||
|
|
||||||
|
// AND c.name NOT LIKE '$%'
|
||||||
|
|
||||||
type rawColumnMssql struct {
|
type rawColumnMssql struct {
|
||||||
name string
|
name string
|
||||||
userType string
|
userType string
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
@@ -18,46 +17,7 @@ func NewMssqlTransformer() etl.Transformer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
||||||
var plan []etl.ColumnTransformPlan
|
return []etl.ColumnTransformPlan{}
|
||||||
|
|
||||||
for i, col := range columns {
|
|
||||||
switch col.SystemType() {
|
|
||||||
case "uniqueidentifier":
|
|
||||||
plan = append(plan, etl.ColumnTransformPlan{
|
|
||||||
Index: i,
|
|
||||||
Fn: func(v any) (any, error) {
|
|
||||||
if b, ok := v.([]byte); ok && b != nil {
|
|
||||||
return mssqlUuidToBigEndian(b)
|
|
||||||
}
|
|
||||||
return v, nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
case "geometry", "geography":
|
|
||||||
plan = append(plan, etl.ColumnTransformPlan{
|
|
||||||
Index: i,
|
|
||||||
Fn: func(v any) (any, error) {
|
|
||||||
if b, ok := v.([]byte); ok && b != nil {
|
|
||||||
return wkbToEwkbWithSrid(b, 4326)
|
|
||||||
}
|
|
||||||
return v, nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
case "datetime", "datetime2":
|
|
||||||
plan = append(plan, etl.ColumnTransformPlan{
|
|
||||||
Index: i,
|
|
||||||
Fn: func(v any) (any, error) {
|
|
||||||
if t, ok := v.(time.Time); ok {
|
|
||||||
return ensureUTC(t), nil
|
|
||||||
}
|
|
||||||
return v, nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return plan
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const processBatchCtxCheck = 4096
|
const processBatchCtxCheck = 4096
|
||||||
|
|||||||
Reference in New Issue
Block a user