From 7bb67ddfcfd6e3dc5fed94038ca3e0186da4366a Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Tue, 7 Apr 2026 12:55:05 -0500 Subject: [PATCH] feat: refactor data extraction and loading functions for improved context handling --- cmd/go_migrate/extractor.go | 6 +++--- cmd/go_migrate/loader.go | 37 ++++++++++++++++++++++++++++++++++++- cmd/go_migrate/log.go | 2 +- cmd/go_migrate/main.go | 5 +++-- cmd/go_migrate/process.go | 33 ++++++++++++--------------------- 5 files changed, 55 insertions(+), 28 deletions(-) diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go index 9cf34e4..b7e9ef2 100644 --- a/cmd/go_migrate/extractor.go +++ b/cmd/go_migrate/extractor.go @@ -9,7 +9,7 @@ import ( log "github.com/sirupsen/logrus" ) -type UnknownRowValues []any +type UnknownRowValues = []any func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *sql.DB, out chan<- []UnknownRowValues) error { query := buildExtractQueryMssql(job, columns) @@ -40,13 +40,13 @@ func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnTyp if len(rowsChunk) >= chunkSize { out <- rowsChunk rowsChunk = make([]UnknownRowValues, 0, chunkSize) - log.Infof("Chunk send... %+v", job) + log.Debugf("Chunk send... %+v", job) } } if len(rowsChunk) > 0 { out <- rowsChunk - log.Infof("Chunk send... %+v", job) + log.Debugf("Chunk send... %+v", job) } return nil diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index 878da9e..6ea285f 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -1,12 +1,15 @@ package main import ( + "context" "fmt" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" ) -func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) { +func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) { for rows := range in { log.Debugf("Chunk received, loading data into...") @@ -17,3 +20,35 @@ func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowVa } } } + +func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error { + for rows := range in { + identifier := pgx.Identifier{job.Schema, job.Table} + colNames := Map(columns, func(col ColumnType) string { + return col.name + }) + + _, err := db.CopyFrom( + ctx, + identifier, + colNames, + pgx.CopyFromRows(rows), + ) + + if err != nil { + return err + } + } + + return nil +} + +func Map[T any, V any](input []T, mapper func(T) V) []V { + result := make([]V, len(input)) + + for i, v := range input { + result[i] = mapper(v) + } + + return result +} diff --git a/cmd/go_migrate/log.go b/cmd/go_migrate/log.go index 0aff8e9..1e7e741 100644 --- a/cmd/go_migrate/log.go +++ b/cmd/go_migrate/log.go @@ -11,5 +11,5 @@ func configureLog() { FullTimestamp: true, TimestampFormat: time.StampMilli, }) - log.SetLevel(log.DebugLevel) + log.SetLevel(log.InfoLevel) } diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index 93546e7..e8388c8 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -19,8 +19,9 @@ var migrationJobs []MigrationJob = []MigrationJob{ } const ( - NumExtractors int = 2 - ChunkSize int = 20 + NumExtractors int = 1 + NumLoaders int = 2 + ChunkSize int = 50000 QueueSize int = 10 ) diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index afa38dc..ec5aa24 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -22,10 +22,10 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration chRowsExtract := make(chan []UnknownRowValues, QueueSize) chRowsTransform := make(chan []UnknownRowValues) - mssqlContext := context.Background() + mssqlCtx := context.Background() go func() { - if err := extractFromMssql(mssqlContext, job, sourceColTypes, ChunkSize, sourceDb, chRowsExtract); err != nil { + if err := extractFromMssql(mssqlCtx, job, sourceColTypes, ChunkSize, sourceDb, chRowsExtract); err != nil { log.Error("Unexpected error extrating data from mssql: ", err) } close(chRowsExtract) @@ -36,27 +36,18 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration close(chRowsTransform) }() - var wgFakeLoaders sync.WaitGroup + var wgPostgresLoaders sync.WaitGroup + postgresLoaderCtx := context.Background() - wgFakeLoaders.Go(func() { - fakeLoader(job, sourceColTypes, chRowsTransform) - }) + for range NumLoaders { + wgPostgresLoaders.Go(func() { + if err := loadRowsPostgres(postgresLoaderCtx, job, sourceColTypes, targetDb, chRowsTransform); err != nil { + log.Error("Unexpected error loading data into postgres: ", err) + } + }) + } - chRowsExtractPostgres := make(chan []UnknownRowValues, QueueSize) - postgresContext := context.Background() - - go func() { - if err := extractFromPostgres(postgresContext, job, sourceColTypes, ChunkSize, targetDb, chRowsExtractPostgres); err != nil { - log.Error("Unexpected error extrating data from postgres: ", err) - } - close(chRowsExtractPostgres) - }() - - wgFakeLoaders.Go(func() { - fakeLoader(job, targetColTypes, chRowsExtractPostgres) - }) - - wgFakeLoaders.Wait() + wgPostgresLoaders.Wait() } func logColumnTypes(columnTypes []ColumnType, label string) {