feat: implement Postgres loader and refactor migration job processing

This commit is contained in:
2026-04-10 20:40:01 -05:00
parent 053e6bd673
commit c1bae79f98
5 changed files with 172 additions and 202 deletions

View File

@@ -10,6 +10,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractor"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loader"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformer"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
@@ -63,6 +64,7 @@ func processMigrationJob(
mssqlExtractor := extractor.NewMssqlExtractor(sourceDb)
mssqlToPostgresTransformer := transformer.NewMssqlTransformer()
postgresLoader := loader.NewPostgresLoader(targetDb)
go func() {
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
@@ -120,7 +122,16 @@ func processMigrationJob(
for range job.MaxLoaders {
wgLoaders.Go(func() {
loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks, &rowsLoaded)
postgresLoader.Exec(
jobCtx,
job.TargetTable,
targetColTypes,
chChunksTransformed,
chLoadersErrors,
chJobErrors,
&wgActiveChunks,
&rowsLoaded,
)
})
}