Compare commits
1 Commits
refactor/r
...
846a49d40c
| Author | SHA1 | Date | |
|---|---|---|---|
|
846a49d40c
|
@@ -120,7 +120,7 @@ func processMigrationJobs(
|
||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
||||
extractor := extractors.NewMssqlExtractor(sourceDb)
|
||||
transformer := transformers.NewMssqlTransformer()
|
||||
loader := loaders.NewPostgresLoader(targetDb)
|
||||
loader := loaders.NewGenericLoader(targetDb)
|
||||
|
||||
for i := range maxParallelWorkers {
|
||||
wgJobs.Go(func() {
|
||||
|
||||
@@ -15,31 +15,21 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
type PostgresLoader struct {
|
||||
type GenericLoader struct {
|
||||
db dbwrapper.DbWrapper
|
||||
}
|
||||
|
||||
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
|
||||
return &PostgresLoader{db: db}
|
||||
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
|
||||
return &GenericLoader{db: db}
|
||||
}
|
||||
|
||||
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
|
||||
result := make([]V, len(input))
|
||||
|
||||
for i, v := range input {
|
||||
result[i] = mapper(v)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (postgresLd *PostgresLoader) ProcessBatch(
|
||||
func (gl *GenericLoader) ProcessBatch(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
batch models.Batch,
|
||||
) (int, error) {
|
||||
_, err := postgresLd.db.SaveMassive(
|
||||
_, err := gl.db.SaveMassive(
|
||||
ctx,
|
||||
tableInfo.Schema,
|
||||
tableInfo.Table,
|
||||
@@ -65,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
|
||||
return len(batch.Rows), nil
|
||||
}
|
||||
|
||||
func (postgresLd *PostgresLoader) Exec(
|
||||
func (gl *GenericLoader) Exec(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
columns []models.ColumnType,
|
||||
@@ -92,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
|
||||
return
|
||||
}
|
||||
|
||||
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
|
||||
if err != nil {
|
||||
var ldError *custom_errors.LoaderError
|
||||
@@ -1 +0,0 @@
|
||||
package loaders
|
||||
11
internal/app/etl/loaders/utils.go
Normal file
11
internal/app/etl/loaders/utils.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package loaders
|
||||
|
||||
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
|
||||
result := make([]V, len(input))
|
||||
|
||||
for i, v := range input {
|
||||
result[i] = mapper(v)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
Reference in New Issue
Block a user