Compare commits
1 Commits
refactor/r
...
846a49d40c
| Author | SHA1 | Date | |
|---|---|---|---|
|
846a49d40c
|
@@ -120,7 +120,7 @@ func processMigrationJobs(
|
|||||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
||||||
extractor := extractors.NewMssqlExtractor(sourceDb)
|
extractor := extractors.NewMssqlExtractor(sourceDb)
|
||||||
transformer := transformers.NewMssqlTransformer()
|
transformer := transformers.NewMssqlTransformer()
|
||||||
loader := loaders.NewPostgresLoader(targetDb)
|
loader := loaders.NewGenericLoader(targetDb)
|
||||||
|
|
||||||
for i := range maxParallelWorkers {
|
for i := range maxParallelWorkers {
|
||||||
wgJobs.Go(func() {
|
wgJobs.Go(func() {
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -15,8 +15,6 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
|
|
||||||
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
|
|
||||||
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
|
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
|
||||||
github.com/golang-sql/sqlexp v0.1.0 // indirect
|
github.com/golang-sql/sqlexp v0.1.0 // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -16,10 +16,6 @@ github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZ
|
|||||||
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
|
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
|
||||||
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
|
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
|
||||||
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
|
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
|
||||||
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
|
|
||||||
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
|
||||||
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
|
|
||||||
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
|
||||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
|||||||
@@ -15,31 +15,21 @@ import (
|
|||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PostgresLoader struct {
|
type GenericLoader struct {
|
||||||
db dbwrapper.DbWrapper
|
db dbwrapper.DbWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
|
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
|
||||||
return &PostgresLoader{db: db}
|
return &GenericLoader{db: db}
|
||||||
}
|
}
|
||||||
|
|
||||||
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
|
func (gl *GenericLoader) ProcessBatch(
|
||||||
result := make([]V, len(input))
|
|
||||||
|
|
||||||
for i, v := range input {
|
|
||||||
result[i] = mapper(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func (postgresLd *PostgresLoader) ProcessBatch(
|
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.TargetTableInfo,
|
tableInfo config.TargetTableInfo,
|
||||||
colNames []string,
|
colNames []string,
|
||||||
batch models.Batch,
|
batch models.Batch,
|
||||||
) (int, error) {
|
) (int, error) {
|
||||||
_, err := postgresLd.db.SaveMassive(
|
_, err := gl.db.SaveMassive(
|
||||||
ctx,
|
ctx,
|
||||||
tableInfo.Schema,
|
tableInfo.Schema,
|
||||||
tableInfo.Table,
|
tableInfo.Table,
|
||||||
@@ -65,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
|
|||||||
return len(batch.Rows), nil
|
return len(batch.Rows), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (postgresLd *PostgresLoader) Exec(
|
func (gl *GenericLoader) Exec(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.TargetTableInfo,
|
tableInfo config.TargetTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
@@ -92,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
|
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var ldError *custom_errors.LoaderError
|
var ldError *custom_errors.LoaderError
|
||||||
@@ -1 +0,0 @@
|
|||||||
package loaders
|
|
||||||
11
internal/app/etl/loaders/utils.go
Normal file
11
internal/app/etl/loaders/utils.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package loaders
|
||||||
|
|
||||||
|
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
|
||||||
|
result := make([]V, len(input))
|
||||||
|
|
||||||
|
for i, v := range input {
|
||||||
|
result[i] = mapper(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
@@ -1,58 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/cenkalti/backoff/v5"
|
|
||||||
)
|
|
||||||
|
|
||||||
func ExampleRetry() {
|
|
||||||
// Define an operation function that returns a value and an error.
|
|
||||||
// The value can be any type.
|
|
||||||
// We'll pass this operation to Retry function.
|
|
||||||
operation := func() (string, error) {
|
|
||||||
// An example request that may fail.
|
|
||||||
resp, err := http.Get("http://httpbin.org/get")
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
// If we are being rate limited, return a RetryAfter to specify how long to wait.
|
|
||||||
// This will also reset the backoff policy.
|
|
||||||
if resp.StatusCode == 429 {
|
|
||||||
seconds, err := strconv.ParseInt(resp.Header.Get("Retry-After"), 10, 64)
|
|
||||||
if err == nil {
|
|
||||||
return "", backoff.RetryAfter(int(seconds))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// In case of non-retriable error, return Permanent error to stop retrying.
|
|
||||||
// For this HTTP example, client errors are non-retriable.
|
|
||||||
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
|
|
||||||
return "", backoff.Permanent(errors.New("bad request"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return successful response.
|
|
||||||
return "hello", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
result, err := backoff.Retry(context.TODO(), operation, backoff.WithBackOff(backoff.NewExponentialBackOff()))
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Error:", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Operation is successful after retries.
|
|
||||||
|
|
||||||
fmt.Println(result)
|
|
||||||
// Output: hello
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
ExampleRetry()
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user