1 Commits

11 changed files with 128 additions and 26 deletions

View File

@@ -120,7 +120,7 @@ func processMigrationJobs(
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer()
loader := loaders.NewGenericLoader(targetDb)
loader := loaders.NewPostgresLoader(targetDb)
for i := range maxParallelWorkers {
wgJobs.Go(func() {

View File

@@ -1,6 +1,6 @@
max_parallel_workers: 4
source_db_type: sqlserver
target_db_type: sqlserver
target_db_type: postgres
defaults:
max_extractors: 2

2
go.mod
View File

@@ -15,6 +15,8 @@ require (
)
require (
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect

4
go.sum
View File

@@ -16,6 +16,10 @@ github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZ
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -43,9 +43,9 @@ func buildExtractQueryMssql(
for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
// if col.Type() == "GEOMETRY" {
// fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
// }
if col.Type() == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
}
if i < len(columns)-1 {
sbQuery.WriteString(", ")

View File

@@ -15,21 +15,31 @@ import (
"github.com/jackc/pgx/v5/pgconn"
)
type GenericLoader struct {
type PostgresLoader struct {
db dbwrapper.DbWrapper
}
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &GenericLoader{db: db}
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: db}
}
func (gl *GenericLoader) ProcessBatch(
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error) {
_, err := gl.db.SaveMassive(
_, err := postgresLd.db.SaveMassive(
ctx,
tableInfo.Schema,
tableInfo.Table,
@@ -55,7 +65,7 @@ func (gl *GenericLoader) ProcessBatch(
return len(batch.Rows), nil
}
func (gl *GenericLoader) Exec(
func (postgresLd *PostgresLoader) Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
@@ -82,7 +92,7 @@ func (gl *GenericLoader) Exec(
return
}
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil {
var ldError *custom_errors.LoaderError

View File

@@ -0,0 +1 @@
package loaders

View File

@@ -1,11 +0,0 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -39,8 +39,6 @@ JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;`
// AND c.name NOT LIKE '$%'
type rawColumnMssql struct {
name string
userType string

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
@@ -17,7 +18,46 @@ func NewMssqlTransformer() etl.Transformer {
}
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
return []etl.ColumnTransformPlan{}
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
const processBatchCtxCheck = 4096

View File

@@ -0,0 +1,58 @@
package main
import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"github.com/cenkalti/backoff/v5"
)
func ExampleRetry() {
// Define an operation function that returns a value and an error.
// The value can be any type.
// We'll pass this operation to Retry function.
operation := func() (string, error) {
// An example request that may fail.
resp, err := http.Get("http://httpbin.org/get")
if err != nil {
return "", err
}
defer resp.Body.Close()
// If we are being rate limited, return a RetryAfter to specify how long to wait.
// This will also reset the backoff policy.
if resp.StatusCode == 429 {
seconds, err := strconv.ParseInt(resp.Header.Get("Retry-After"), 10, 64)
if err == nil {
return "", backoff.RetryAfter(int(seconds))
}
}
// In case of non-retriable error, return Permanent error to stop retrying.
// For this HTTP example, client errors are non-retriable.
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
return "", backoff.Permanent(errors.New("bad request"))
}
// Return successful response.
return "hello", nil
}
result, err := backoff.Retry(context.TODO(), operation, backoff.WithBackOff(backoff.NewExponentialBackOff()))
if err != nil {
fmt.Println("Error:", err)
return
}
// Operation is successful after retries.
fmt.Println(result)
// Output: hello
}
func main() {
ExampleRetry()
}