feat: refactor data extraction and loading functions for improved context handling
This commit is contained in:
@@ -9,7 +9,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type UnknownRowValues []any
|
||||
type UnknownRowValues = []any
|
||||
|
||||
func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *sql.DB, out chan<- []UnknownRowValues) error {
|
||||
query := buildExtractQueryMssql(job, columns)
|
||||
@@ -40,13 +40,13 @@ func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnTyp
|
||||
if len(rowsChunk) >= chunkSize {
|
||||
out <- rowsChunk
|
||||
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
|
||||
log.Infof("Chunk send... %+v", job)
|
||||
log.Debugf("Chunk send... %+v", job)
|
||||
}
|
||||
}
|
||||
|
||||
if len(rowsChunk) > 0 {
|
||||
out <- rowsChunk
|
||||
log.Infof("Chunk send... %+v", job)
|
||||
log.Debugf("Chunk send... %+v", job)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) {
|
||||
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
|
||||
for rows := range in {
|
||||
log.Debugf("Chunk received, loading data into...")
|
||||
|
||||
@@ -17,3 +20,35 @@ func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowVa
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error {
|
||||
for rows := range in {
|
||||
identifier := pgx.Identifier{job.Schema, job.Table}
|
||||
colNames := Map(columns, func(col ColumnType) string {
|
||||
return col.name
|
||||
})
|
||||
|
||||
_, err := db.CopyFrom(
|
||||
ctx,
|
||||
identifier,
|
||||
colNames,
|
||||
pgx.CopyFromRows(rows),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Map[T any, V any](input []T, mapper func(T) V) []V {
|
||||
result := make([]V, len(input))
|
||||
|
||||
for i, v := range input {
|
||||
result[i] = mapper(v)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -11,5 +11,5 @@ func configureLog() {
|
||||
FullTimestamp: true,
|
||||
TimestampFormat: time.StampMilli,
|
||||
})
|
||||
log.SetLevel(log.DebugLevel)
|
||||
log.SetLevel(log.InfoLevel)
|
||||
}
|
||||
|
||||
@@ -19,8 +19,9 @@ var migrationJobs []MigrationJob = []MigrationJob{
|
||||
}
|
||||
|
||||
const (
|
||||
NumExtractors int = 2
|
||||
ChunkSize int = 20
|
||||
NumExtractors int = 1
|
||||
NumLoaders int = 2
|
||||
ChunkSize int = 50000
|
||||
QueueSize int = 10
|
||||
)
|
||||
|
||||
|
||||
@@ -22,10 +22,10 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
|
||||
|
||||
chRowsExtract := make(chan []UnknownRowValues, QueueSize)
|
||||
chRowsTransform := make(chan []UnknownRowValues)
|
||||
mssqlContext := context.Background()
|
||||
mssqlCtx := context.Background()
|
||||
|
||||
go func() {
|
||||
if err := extractFromMssql(mssqlContext, job, sourceColTypes, ChunkSize, sourceDb, chRowsExtract); err != nil {
|
||||
if err := extractFromMssql(mssqlCtx, job, sourceColTypes, ChunkSize, sourceDb, chRowsExtract); err != nil {
|
||||
log.Error("Unexpected error extrating data from mssql: ", err)
|
||||
}
|
||||
close(chRowsExtract)
|
||||
@@ -36,27 +36,18 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
|
||||
close(chRowsTransform)
|
||||
}()
|
||||
|
||||
var wgFakeLoaders sync.WaitGroup
|
||||
var wgPostgresLoaders sync.WaitGroup
|
||||
postgresLoaderCtx := context.Background()
|
||||
|
||||
wgFakeLoaders.Go(func() {
|
||||
fakeLoader(job, sourceColTypes, chRowsTransform)
|
||||
})
|
||||
|
||||
chRowsExtractPostgres := make(chan []UnknownRowValues, QueueSize)
|
||||
postgresContext := context.Background()
|
||||
|
||||
go func() {
|
||||
if err := extractFromPostgres(postgresContext, job, sourceColTypes, ChunkSize, targetDb, chRowsExtractPostgres); err != nil {
|
||||
log.Error("Unexpected error extrating data from postgres: ", err)
|
||||
for range NumLoaders {
|
||||
wgPostgresLoaders.Go(func() {
|
||||
if err := loadRowsPostgres(postgresLoaderCtx, job, sourceColTypes, targetDb, chRowsTransform); err != nil {
|
||||
log.Error("Unexpected error loading data into postgres: ", err)
|
||||
}
|
||||
close(chRowsExtractPostgres)
|
||||
}()
|
||||
|
||||
wgFakeLoaders.Go(func() {
|
||||
fakeLoader(job, targetColTypes, chRowsExtractPostgres)
|
||||
})
|
||||
}
|
||||
|
||||
wgFakeLoaders.Wait()
|
||||
wgPostgresLoaders.Wait()
|
||||
}
|
||||
|
||||
func logColumnTypes(columnTypes []ColumnType, label string) {
|
||||
|
||||
Reference in New Issue
Block a user