diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go index fb242e9..9cf34e4 100644 --- a/cmd/go_migrate/extractor.go +++ b/cmd/go_migrate/extractor.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" + "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" ) @@ -50,3 +51,38 @@ func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnTyp return nil } + +func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error { + query := buildExtractQueryPostgres(job, columns) + log.Debug("Query used to extract data from postgres: ", query) + + rows, err := db.Query(ctx, query) + if err != nil { + return err + } + defer rows.Close() + + rowsChunk := make([]UnknownRowValues, 0, chunkSize) + + for rows.Next() { + values, err := rows.Values() + if err != nil { + return err + } + + rowsChunk = append(rowsChunk, values) + + if len(rowsChunk) >= chunkSize { + out <- rowsChunk + rowsChunk = make([]UnknownRowValues, 0, chunkSize) + log.Infof("Chunk send... %+v", job) + } + } + + if len(rowsChunk) > 0 { + out <- rowsChunk + log.Infof("Chunk send... %+v", job) + } + + return nil +} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 70ce92b..0d2c2b0 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "sync" "github.com/jackc/pgx/v5/pgxpool" @@ -21,15 +22,47 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration logColumnTypes(targetColTypes, "Target col types") chRowsExtract := make(chan []UnknownRowValues, QueueSize) - mssqlContext := context.Background() + var wgMssqlExtractors sync.WaitGroup - if err := extractFromMssql(mssqlContext, job, sourceColTypes, ChunkSize, sourceDb, chRowsExtract); err != nil { - log.Fatal("Unexpected error extrating data from mssql: ", err) - } - close(chRowsExtract) + wgMssqlExtractors.Go(func() { + if err := extractFromMssql(mssqlContext, job, sourceColTypes, ChunkSize, sourceDb, chRowsExtract); err != nil { + log.Error("Unexpected error extrating data from mssql: ", err) + } + }) - transformRowsMssql(job, sourceColTypes, chRowsExtract) + go func() { + wgMssqlExtractors.Wait() + close(chRowsExtract) + }() + + var wgMssqlTransformers sync.WaitGroup + wgMssqlTransformers.Go(func() { + transformRows(job, sourceColTypes, "sqlserver", chRowsExtract) + }) + wgMssqlTransformers.Wait() + + chRowsExtractPostgres := make(chan []UnknownRowValues, QueueSize) + postgresContext := context.Background() + var wgPostgresExtractors sync.WaitGroup + + wgPostgresExtractors.Go(func() { + if err := extractFromPostgres(postgresContext, job, sourceColTypes, ChunkSize, targetDb, chRowsExtractPostgres); err != nil { + log.Error("Unexpected error extrating data from postgres: ", err) + } + }) + + go func() { + wgPostgresExtractors.Wait() + close(chRowsExtractPostgres) + }() + + var wgPostgresTransformers sync.WaitGroup + wgPostgresTransformers.Go(func() { + transformRows(job, sourceColTypes, "postgres", chRowsExtractPostgres) + }) + + wgPostgresTransformers.Wait() } func logColumnTypes(columnTypes []ColumnType, label string) { @@ -40,9 +73,9 @@ func logColumnTypes(columnTypes []ColumnType, label string) { } } -func transformRowsMssql(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) { +func transformRows(job MigrationJob, columns []ColumnType, driver string, in <-chan []UnknownRowValues) { for rows := range in { - log.Debug("Chunk received, transforming...") + log.Debugf("Chunk received (%s), transforming...", driver) for i, rowValues := range rows { if i%100 == 0 {