feat: implement GenericLoader for batch processing and utility functions

This commit is contained in:
2026-04-17 15:58:08 -05:00
parent 93b302db8e
commit 846a49d40c
4 changed files with 19 additions and 19 deletions

View File

@@ -120,7 +120,7 @@ func processMigrationJobs(
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb) targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb) extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer() transformer := transformers.NewMssqlTransformer()
loader := loaders.NewPostgresLoader(targetDb) loader := loaders.NewGenericLoader(targetDb)
for i := range maxParallelWorkers { for i := range maxParallelWorkers {
wgJobs.Go(func() { wgJobs.Go(func() {

View File

@@ -15,31 +15,21 @@ import (
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
) )
type PostgresLoader struct { type GenericLoader struct {
db dbwrapper.DbWrapper db dbwrapper.DbWrapper
} }
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader { func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: db} return &GenericLoader{db: db}
} }
func mapSlice[T any, V any](input []T, mapper func(T) V) []V { func (gl *GenericLoader) ProcessBatch(
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
_, err := postgresLd.db.SaveMassive( _, err := gl.db.SaveMassive(
ctx, ctx,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table, tableInfo.Table,
@@ -65,7 +55,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
return len(batch.Rows), nil return len(batch.Rows), nil
} }
func (postgresLd *PostgresLoader) Exec( func (gl *GenericLoader) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -92,7 +82,7 @@ func (postgresLd *PostgresLoader) Exec(
return return
} }
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch) processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil { if err != nil {
var ldError *custom_errors.LoaderError var ldError *custom_errors.LoaderError

View File

@@ -1 +0,0 @@
package loaders

View File

@@ -0,0 +1,11 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}