feat: implement database wrapper interfaces for MSSQL and Postgres; enhance migration job processing with pre and post SQL execution

This commit is contained in:
2026-04-13 07:57:18 -05:00
parent 85074da2ec
commit 33c9cd9c3e
7 changed files with 101 additions and 13 deletions

View File

@@ -8,6 +8,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
@@ -17,6 +18,8 @@ import (
func processMigrationJob(
ctx context.Context,
// sourceDbWrapper db.DbWrapper,
targetDbWrapper db.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor,
@@ -24,7 +27,7 @@ func processMigrationJob(
loader etl.Loader,
job config.Job,
) JobResult {
jobCtx, cancel := context.WithCancel(ctx)
localCtx, cancel := context.WithCancel(ctx)
defer cancel()
result := JobResult{
@@ -39,7 +42,7 @@ func processMigrationJob(
wgQueryColumnTypes.Go(func() error {
var err error
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(jobCtx, job.SourceTable.TableInfo)
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo)
if err != nil {
return err
}
@@ -49,7 +52,7 @@ func processMigrationJob(
wgQueryColumnTypes.Go(func() error {
var err error
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(jobCtx, job.TargetTable.TableInfo)
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo)
if err != nil {
return err
}
@@ -63,8 +66,15 @@ func processMigrationJob(
return result
}
for _, query := range job.PreSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
partitions, err := table_analyzers.PartitionRangeGenerator(
jobCtx,
localCtx,
sourceTableAnalyzer,
job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey,
@@ -88,7 +98,7 @@ func processMigrationJob(
var wgLoaders sync.WaitGroup
go func() {
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
cancel()
result.Error = err
@@ -96,7 +106,7 @@ func processMigrationJob(
}()
go custom_errors.ExtractorErrorHandler(
jobCtx,
localCtx,
job.Retry,
job.MaxPartitionErrrors,
chExtractorErrors,
@@ -105,7 +115,7 @@ func processMigrationJob(
&wgActivePartitions,
)
go custom_errors.LoaderErrorHandler(
jobCtx,
localCtx,
job.Retry,
job.MaxChunkErrors,
chLoadersErrors,
@@ -120,7 +130,7 @@ func processMigrationJob(
for range maxExtractors {
wgExtractors.Go(func() {
extractor.Exec(
jobCtx,
localCtx,
job.SourceTable,
sourceColTypes,
job.BatchSize,
@@ -146,7 +156,7 @@ func processMigrationJob(
for range maxExtractors {
wgTransformers.Go(func() {
transformer.Exec(
jobCtx,
localCtx,
sourceColTypes,
chBatchesRaw,
chBatchesTransformed,
@@ -161,7 +171,7 @@ func processMigrationJob(
for range job.MaxLoaders {
wgLoaders.Go(func() {
loader.Exec(
jobCtx,
localCtx,
job.TargetTable,
targetColTypes,
chBatchesTransformed,
@@ -204,8 +214,15 @@ func processMigrationJob(
cancel()
}()
for _, query := range job.PostSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
log.Debugf("waiting for local context to be done (%v)", job.Name)
<-jobCtx.Done()
<-localCtx.Done()
log.Debugf("local context done (%v)", job.Name)
if ctx.Err() != nil {