2 Commits

11 changed files with 26 additions and 128 deletions

View File

@@ -120,7 +120,7 @@ func processMigrationJobs(
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer() transformer := transformers.NewMssqlTransformer()
loader := loaders.NewPostgresLoader(targetDb) loader := loaders.NewGenericLoader(targetDb)
for i := range maxParallelWorkers { for i := range maxParallelWorkers {
wgJobs.Go(func() { wgJobs.Go(func() {

View File

@@ -1,6 +1,6 @@
max_parallel_workers: 4 max_parallel_workers: 4
source_db_type: sqlserver source_db_type: sqlserver
target_db_type: postgres target_db_type: sqlserver
defaults: defaults:
max_extractors: 2 max_extractors: 2

2
go.mod
View File

@@ -15,8 +15,6 @@ require (
) )
require ( require (
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect

4
go.sum
View File

@@ -16,10 +16,6 @@ github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZ
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -43,9 +43,9 @@ func buildExtractQueryMssql(
for i, col := range columns { for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name()) fmt.Fprintf(&sbQuery, "[%s]", col.Name())
if col.Type() == "GEOMETRY" { // if col.Type() == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name()) // fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
} // }
if i < len(columns)-1 { if i < len(columns)-1 {
sbQuery.WriteString(", ") sbQuery.WriteString(", ")

View File

@@ -15,31 +15,21 @@ import (
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
) )
type PostgresLoader struct { type GenericLoader struct {
db dbwrapper.DbWrapper db dbwrapper.DbWrapper
} }
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader { func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: db} return &GenericLoader{db: db}
} }
func mapSlice[T any, V any](input []T, mapper func(T) V) []V { func (gl *GenericLoader) ProcessBatch(
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
_, err := postgresLd.db.SaveMassive( _, err := gl.db.SaveMassive(
ctx, ctx,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table, tableInfo.Table,
@@ -65,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
return len(batch.Rows), nil return len(batch.Rows), nil
} }
func (postgresLd *PostgresLoader) Exec( func (gl *GenericLoader) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -92,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
return return
} }
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch) processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil { if err != nil {
var ldError *custom_errors.LoaderError var ldError *custom_errors.LoaderError

View File

@@ -1 +0,0 @@
package loaders

View File

@@ -0,0 +1,11 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -39,6 +39,8 @@ JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%')) WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;` ORDER BY c.column_id;`
// AND c.name NOT LIKE '$%'
type rawColumnMssql struct { type rawColumnMssql struct {
name string name string
userType string userType string

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"sync" "sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
@@ -18,46 +17,7 @@ func NewMssqlTransformer() etl.Transformer {
} }
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan { func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan return []etl.ColumnTransformPlan{}
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
} }
const processBatchCtxCheck = 4096 const processBatchCtxCheck = 4096

View File

@@ -1,58 +0,0 @@
package main
import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"github.com/cenkalti/backoff/v5"
)
func ExampleRetry() {
// Define an operation function that returns a value and an error.
// The value can be any type.
// We'll pass this operation to Retry function.
operation := func() (string, error) {
// An example request that may fail.
resp, err := http.Get("http://httpbin.org/get")
if err != nil {
return "", err
}
defer resp.Body.Close()
// If we are being rate limited, return a RetryAfter to specify how long to wait.
// This will also reset the backoff policy.
if resp.StatusCode == 429 {
seconds, err := strconv.ParseInt(resp.Header.Get("Retry-After"), 10, 64)
if err == nil {
return "", backoff.RetryAfter(int(seconds))
}
}
// In case of non-retriable error, return Permanent error to stop retrying.
// For this HTTP example, client errors are non-retriable.
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
return "", backoff.Permanent(errors.New("bad request"))
}
// Return successful response.
return "hello", nil
}
result, err := backoff.Retry(context.TODO(), operation, backoff.WithBackOff(backoff.NewExponentialBackOff()))
if err != nil {
fmt.Println("Error:", err)
return
}
// Operation is successful after retries.
fmt.Println(result)
// Output: hello
}
func main() {
ExampleRetry()
}