feat: refactor migration job structure to use SourceTableInfo and TargetTableInfo for improved configuration handling
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -19,7 +20,7 @@ type Batch struct {
|
|||||||
RetryCounter int
|
RetryCounter int
|
||||||
}
|
}
|
||||||
|
|
||||||
func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, job MigrationJob) (int64, error) {
|
func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) (int64, error) {
|
||||||
query := `
|
query := `
|
||||||
SELECT
|
SELECT
|
||||||
SUM(p.rows) AS count
|
SUM(p.rows) AS count
|
||||||
@@ -33,7 +34,7 @@ GROUP BY t.name`
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
var rowsCount int64
|
var rowsCount int64
|
||||||
err := db.QueryRowContext(ctxTimeout, query, sql.Named("schema", job.Schema), sql.Named("table", job.Table)).Scan(&rowsCount)
|
err := db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -41,7 +42,7 @@ GROUP BY t.name`
|
|||||||
return rowsCount, nil
|
return rowsCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func calculateBatchesMssql(ctx context.Context, db *sql.DB, job MigrationJob, batchCount int64) ([]Batch, error) {
|
func calculateBatchesMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, batchCount int64) ([]Batch, error) {
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
SELECT
|
SELECT
|
||||||
MIN([%s]) AS lower_limit,
|
MIN([%s]) AS lower_limit,
|
||||||
@@ -49,7 +50,13 @@ SELECT
|
|||||||
FROM
|
FROM
|
||||||
(SELECT [%s], NTILE(@batchCount) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T
|
(SELECT [%s], NTILE(@batchCount) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T
|
||||||
GROUP BY batch_id
|
GROUP BY batch_id
|
||||||
ORDER BY batch_id`, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.Schema, job.Table)
|
ORDER BY batch_id`,
|
||||||
|
tableInfo.PrimaryKey,
|
||||||
|
tableInfo.PrimaryKey,
|
||||||
|
tableInfo.PrimaryKey,
|
||||||
|
tableInfo.PrimaryKey,
|
||||||
|
tableInfo.Schema,
|
||||||
|
tableInfo.Table)
|
||||||
|
|
||||||
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
|
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@@ -84,8 +91,8 @@ ORDER BY batch_id`, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.PrimaryK
|
|||||||
return batches, nil
|
return batches, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func batchGeneratorMssql(ctx context.Context, db *sql.DB, job MigrationJob) ([]Batch, error) {
|
func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) ([]Batch, error) {
|
||||||
rowsCount, err := estimateTotalRowsMssql(ctx, db, job)
|
rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -101,7 +108,7 @@ func batchGeneratorMssql(ctx context.Context, db *sql.DB, job MigrationJob) ([]B
|
|||||||
}}, nil
|
}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
batches, err := calculateBatchesMssql(ctx, db, job, batchCount)
|
batches, err := calculateBatchesMssql(ctx, db, tableInfo, batchCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,9 +3,11 @@ package main
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
func buildExtractQueryMssql(job MigrationJob, columns []ColumnType, includeRange bool, isMinInclusive bool) string {
|
func buildExtractQueryMssql(sourceDbInfo config.SourceTableInfo, columns []ColumnType, includeRange bool, isMinInclusive bool) string {
|
||||||
var sbQuery strings.Builder
|
var sbQuery strings.Builder
|
||||||
|
|
||||||
sbQuery.WriteString("SELECT ")
|
sbQuery.WriteString("SELECT ")
|
||||||
@@ -26,25 +28,25 @@ func buildExtractQueryMssql(job MigrationJob, columns []ColumnType, includeRange
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", job.Schema, job.Table)
|
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", sourceDbInfo.Schema, sourceDbInfo.Table)
|
||||||
|
|
||||||
if includeRange {
|
if includeRange {
|
||||||
fmt.Fprintf(&sbQuery, " WHERE [%s]", job.PrimaryKey)
|
fmt.Fprintf(&sbQuery, " WHERE [%s]", sourceDbInfo.PrimaryKey)
|
||||||
if isMinInclusive {
|
if isMinInclusive {
|
||||||
sbQuery.WriteString(" >=")
|
sbQuery.WriteString(" >=")
|
||||||
} else {
|
} else {
|
||||||
sbQuery.WriteString(" >")
|
sbQuery.WriteString(" >")
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", job.PrimaryKey)
|
fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", sourceDbInfo.PrimaryKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", job.PrimaryKey)
|
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", sourceDbInfo.PrimaryKey)
|
||||||
|
|
||||||
return sbQuery.String()
|
return sbQuery.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildExtractQueryPostgres(job MigrationJob, columns []ColumnType) string {
|
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []ColumnType) string {
|
||||||
var sbColumns strings.Builder
|
var sbColumns strings.Builder
|
||||||
|
|
||||||
if len(columns) == 0 {
|
if len(columns) == 0 {
|
||||||
@@ -69,5 +71,5 @@ func buildExtractQueryPostgres(job MigrationJob, columns []ColumnType) string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), job.Schema, job.Table, job.PrimaryKey)
|
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
_ "github.com/microsoft/go-mssqldb"
|
_ "github.com/microsoft/go-mssqldb"
|
||||||
@@ -27,7 +28,7 @@ type Chunk struct {
|
|||||||
func extractFromMssql(
|
func extractFromMssql(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db *sql.DB,
|
db *sql.DB,
|
||||||
job MigrationJob,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []ColumnType,
|
columns []ColumnType,
|
||||||
chunkSize int,
|
chunkSize int,
|
||||||
chBatchesIn <-chan Batch,
|
chBatchesIn <-chan Batch,
|
||||||
@@ -37,7 +38,7 @@ func extractFromMssql(
|
|||||||
wgActiveBatches *sync.WaitGroup,
|
wgActiveBatches *sync.WaitGroup,
|
||||||
) {
|
) {
|
||||||
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
|
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
|
||||||
return strings.EqualFold(col.name, job.PrimaryKey)
|
return strings.EqualFold(col.name, tableInfo.PrimaryKey)
|
||||||
})
|
})
|
||||||
|
|
||||||
if indexPrimaryKey == -1 {
|
if indexPrimaryKey == -1 {
|
||||||
@@ -68,7 +69,7 @@ func extractFromMssql(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort {
|
if abort := processBatch(ctx, db, tableInfo, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,7 +79,7 @@ func extractFromMssql(
|
|||||||
func processBatch(
|
func processBatch(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db *sql.DB,
|
db *sql.DB,
|
||||||
job MigrationJob,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []ColumnType,
|
columns []ColumnType,
|
||||||
chunkSize int,
|
chunkSize int,
|
||||||
batch Batch,
|
batch Batch,
|
||||||
@@ -87,7 +88,7 @@ func processBatch(
|
|||||||
chErrorsOut chan<- ExtractorError,
|
chErrorsOut chan<- ExtractorError,
|
||||||
wgActiveBatches *sync.WaitGroup,
|
wgActiveBatches *sync.WaitGroup,
|
||||||
) (abort bool) {
|
) (abort bool) {
|
||||||
query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive)
|
query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive)
|
||||||
log.Debug("Query used to extract data from mssql: ", query)
|
log.Debug("Query used to extract data from mssql: ", query)
|
||||||
|
|
||||||
var queryArgs []any
|
var queryArgs []any
|
||||||
@@ -206,8 +207,8 @@ func processBatch(
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error {
|
func extractFromPostgres(ctx context.Context, tableInfo config.SourceTableInfo, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error {
|
||||||
query := buildExtractQueryPostgres(job, columns)
|
query := buildExtractQueryPostgres(tableInfo, columns)
|
||||||
log.Debug("Query used to extract data from postgres: ", query)
|
log.Debug("Query used to extract data from postgres: ", query)
|
||||||
|
|
||||||
rows, err := db.Query(ctx, query)
|
rows, err := db.Query(ctx, query)
|
||||||
@@ -229,13 +230,13 @@ func extractFromPostgres(ctx context.Context, job MigrationJob, columns []Column
|
|||||||
if len(rowsChunk) >= chunkSize {
|
if len(rowsChunk) >= chunkSize {
|
||||||
out <- rowsChunk
|
out <- rowsChunk
|
||||||
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
|
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
|
||||||
log.Infof("Chunk send... %+v", job)
|
log.Infof("Chunk send... %+v", tableInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rowsChunk) > 0 {
|
if len(rowsChunk) > 0 {
|
||||||
out <- rowsChunk
|
out <- rowsChunk
|
||||||
log.Infof("Chunk send... %+v", job)
|
log.Infof("Chunk send... %+v", tableInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
_ "github.com/microsoft/go-mssqldb"
|
_ "github.com/microsoft/go-mssqldb"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -111,7 +112,7 @@ func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, sc
|
|||||||
return column
|
return column
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetColumnTypesPostgres(db *pgxpool.Pool, migrationJob MigrationJob) ([]ColumnType, error) {
|
func GetColumnTypesPostgres(db *pgxpool.Pool, tableInfo config.TargetTableInfo) ([]ColumnType, error) {
|
||||||
query := `
|
query := `
|
||||||
SELECT
|
SELECT
|
||||||
c.column_name AS name,
|
c.column_name AS name,
|
||||||
@@ -129,7 +130,7 @@ ORDER BY c.ordinal_position;
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
rows, err := db.Query(ctx, query, migrationJob.Schema, migrationJob.Table)
|
rows, err := db.Query(ctx, query, tableInfo.Schema, tableInfo.Table)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Error querying column types: %w", err)
|
return nil, fmt.Errorf("Error querying column types: %w", err)
|
||||||
}
|
}
|
||||||
@@ -197,7 +198,7 @@ func MapMssqlColumn(column ColumnType) ColumnType {
|
|||||||
return column
|
return column
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetColumnTypesMssql(db *sql.DB, migrationJob MigrationJob) ([]ColumnType, error) {
|
func GetColumnTypesMssql(db *sql.DB, tableInfo config.SourceTableInfo) ([]ColumnType, error) {
|
||||||
query := `
|
query := `
|
||||||
SELECT
|
SELECT
|
||||||
c.name AS name,
|
c.name AS name,
|
||||||
@@ -219,7 +220,7 @@ ORDER BY c.column_id;
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
rows, err := db.QueryContext(ctx, query, sql.Named("schema", migrationJob.Schema), sql.Named("table", migrationJob.Table))
|
rows, err := db.QueryContext(ctx, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Error querying column types: %w", err)
|
return nil, fmt.Errorf("Error querying column types: %w", err)
|
||||||
}
|
}
|
||||||
@@ -252,7 +253,12 @@ ORDER BY c.column_id;
|
|||||||
return colTypes, nil
|
return colTypes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob MigrationJob) ([]ColumnType, []ColumnType, error) {
|
func GetColumnTypes(
|
||||||
|
sourceDb *sql.DB,
|
||||||
|
targetDb *pgxpool.Pool,
|
||||||
|
sourceTable config.SourceTableInfo,
|
||||||
|
targetTable config.TargetTableInfo,
|
||||||
|
) ([]ColumnType, []ColumnType, error) {
|
||||||
var sourceDbErr error
|
var sourceDbErr error
|
||||||
var targetDbErr error
|
var targetDbErr error
|
||||||
var sourceColTypes []ColumnType
|
var sourceColTypes []ColumnType
|
||||||
@@ -260,14 +266,14 @@ func GetColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob Migra
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
sourceColTypes, sourceDbErr = GetColumnTypesMssql(sourceDb, migrationJob)
|
sourceColTypes, sourceDbErr = GetColumnTypesMssql(sourceDb, sourceTable)
|
||||||
if sourceDbErr != nil {
|
if sourceDbErr != nil {
|
||||||
log.Error("Error (sourceDb): ", sourceDbErr)
|
log.Error("Error (sourceDb): ", sourceDbErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
targetColTypes, targetDbErr = GetColumnTypesPostgres(targetDb, migrationJob)
|
targetColTypes, targetDbErr = GetColumnTypesPostgres(targetDb, targetTable)
|
||||||
if targetDbErr != nil {
|
if targetDbErr != nil {
|
||||||
log.Error("Error (targetDb): ", targetDbErr)
|
log.Error("Error (targetDb): ", targetDbErr)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
@@ -18,14 +19,14 @@ import (
|
|||||||
func loadRowsPostgres(
|
func loadRowsPostgres(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db *pgxpool.Pool,
|
db *pgxpool.Pool,
|
||||||
job MigrationJob,
|
tableInfo config.TargetTableInfo,
|
||||||
columns []ColumnType,
|
columns []ColumnType,
|
||||||
chChunksIn <-chan Chunk,
|
chChunksIn <-chan Chunk,
|
||||||
chErrorsOut chan<- LoaderError,
|
chErrorsOut chan<- LoaderError,
|
||||||
chJobErrorsOut chan<- JobError,
|
chJobErrorsOut chan<- JobError,
|
||||||
wgActiveChunks *sync.WaitGroup,
|
wgActiveChunks *sync.WaitGroup,
|
||||||
) {
|
) {
|
||||||
tableId := pgx.Identifier{job.Schema, job.Table}
|
tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table}
|
||||||
colNames := Map(columns, func(col ColumnType) string {
|
colNames := Map(columns, func(col ColumnType) string {
|
||||||
return col.name
|
return col.name
|
||||||
})
|
})
|
||||||
@@ -102,7 +103,7 @@ func loadChunkPostgres(
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error {
|
func loadRowsMssql(ctx context.Context, tableInfo config.TargetTableInfo, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error {
|
||||||
chunkCount := 0
|
chunkCount := 0
|
||||||
totalRowsLoaded := 0
|
totalRowsLoaded := 0
|
||||||
|
|
||||||
@@ -114,7 +115,7 @@ func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType,
|
|||||||
return fmt.Errorf("error starting transaction: %w", err)
|
return fmt.Errorf("error starting transaction: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fullTableName := fmt.Sprintf("[%s].[%s]", job.Schema, job.Table)
|
fullTableName := fmt.Sprintf("[%s].[%s]", tableInfo.Schema, tableInfo.Table)
|
||||||
colNames := Map(columns, func(col ColumnType) string {
|
colNames := Map(columns, func(col ColumnType) string {
|
||||||
return col.name
|
return col.name
|
||||||
})
|
})
|
||||||
@@ -177,14 +178,13 @@ func Map[T any, V any](input []T, mapper func(T) V) []V {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
|
func fakeLoader(tableInfo config.TargetTableInfo, columns []ColumnType, in <-chan [][]any) {
|
||||||
|
|
||||||
for rows := range in {
|
for rows := range in {
|
||||||
log.Debugf("Chunk received, loading data into...")
|
log.Debugf("Chunk received, loading data into...")
|
||||||
|
|
||||||
for i, rowValues := range rows {
|
for i, rowValues := range rows {
|
||||||
if i%100 == 0 {
|
if i%100 == 0 {
|
||||||
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
|
logSampleRow(tableInfo.Schema, tableInfo.Table, columns, rowValues, fmt.Sprintf("row %d", i))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,28 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MigrationJob struct {
|
|
||||||
Schema string
|
|
||||||
Table string
|
|
||||||
PrimaryKey string
|
|
||||||
}
|
|
||||||
|
|
||||||
var migrationJobs []MigrationJob = []MigrationJob{
|
|
||||||
{
|
|
||||||
Schema: "Cartografia",
|
|
||||||
Table: "MANZANA",
|
|
||||||
PrimaryKey: "GDB_ARCHIVE_OID",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Schema: "Red",
|
|
||||||
Table: "PUERTO",
|
|
||||||
PrimaryKey: "ID_PUERTO",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
NumExtractors int = 4
|
NumExtractors int = 4
|
||||||
NumLoaders int = 8
|
NumLoaders int = 8
|
||||||
@@ -37,6 +19,14 @@ const (
|
|||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
configureLog()
|
configureLog()
|
||||||
|
|
||||||
|
migrationConfig, err := config.ReadMigrationConfig()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error leyendo configuracion: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Config: %+v", migrationConfig)
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
@@ -53,8 +43,8 @@ func main() {
|
|||||||
defer sourceDb.Close()
|
defer sourceDb.Close()
|
||||||
defer targetDb.Close()
|
defer targetDb.Close()
|
||||||
|
|
||||||
for _, job := range migrationJobs {
|
for _, job := range migrationConfig.Jobs {
|
||||||
log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table)
|
log.Infof(">>> Processing job: %s.%s <<<", job.SourceTable.Schema, job.SourceTable.Table)
|
||||||
processMigrationJob(ctx, sourceDb, targetDb, job)
|
processMigrationJob(ctx, sourceDb, targetDb, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
|
||||||
_ "github.com/microsoft/go-mssqldb"
|
_ "github.com/microsoft/go-mssqldb"
|
||||||
@@ -16,12 +17,12 @@ func processMigrationJob(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
sourceDb *sql.DB,
|
sourceDb *sql.DB,
|
||||||
targetDb *pgxpool.Pool,
|
targetDb *pgxpool.Pool,
|
||||||
job MigrationJob,
|
job config.Job,
|
||||||
) {
|
) {
|
||||||
jobStartTime := time.Now()
|
jobStartTime := time.Now()
|
||||||
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
|
log.Infof("Starting migration job: %s.%s [PK: %s]", job.SourceTable.Schema, job.SourceTable.Table, job.SourceTable.PrimaryKey)
|
||||||
|
|
||||||
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job)
|
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Unexpected error: ", err)
|
log.Fatal("Unexpected error: ", err)
|
||||||
}
|
}
|
||||||
@@ -32,7 +33,7 @@ func processMigrationJob(
|
|||||||
jobCtx, cancel := context.WithCancel(ctx)
|
jobCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job)
|
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unexpected error calculating batch ranges: ", err)
|
log.Error("Unexpected error calculating batch ranges: ", err)
|
||||||
}
|
}
|
||||||
@@ -65,7 +66,7 @@ func processMigrationJob(
|
|||||||
|
|
||||||
for range maxExtractors {
|
for range maxExtractors {
|
||||||
wgExtractors.Go(func() {
|
wgExtractors.Go(func() {
|
||||||
extractFromMssql(jobCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches)
|
extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,7 +91,7 @@ func processMigrationJob(
|
|||||||
|
|
||||||
for range NumLoaders {
|
for range NumLoaders {
|
||||||
wgLoaders.Go(func() {
|
wgLoaders.Go(func() {
|
||||||
loadRowsPostgres(jobCtx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
|
loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,8 +129,14 @@ func logColumnTypes(columnTypes []ColumnType, label string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) {
|
func logSampleRow(
|
||||||
log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag)
|
schema string,
|
||||||
|
table string,
|
||||||
|
columns []ColumnType,
|
||||||
|
rowValues UnknownRowValues,
|
||||||
|
tag string,
|
||||||
|
) {
|
||||||
|
log.Infof("[%s.%s] Sample row: (%s)", schema, table, tag)
|
||||||
for i, col := range columns {
|
for i, col := range columns {
|
||||||
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
|
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -13,35 +12,36 @@ type RetryConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type JobConfig struct {
|
type JobConfig struct {
|
||||||
MaxExtractors int `yaml:"max_extractors"`
|
MaxExtractors int `yaml:"max_extractors"`
|
||||||
MaxLoaders int `yaml:"max_loaders"`
|
MaxLoaders int `yaml:"max_loaders"`
|
||||||
QueueSize int `yaml:"queue_size"`
|
QueueSize int `yaml:"queue_size"`
|
||||||
ChunkSize int `yaml:"chunk_size"`
|
ChunkSize int `yaml:"chunk_size"`
|
||||||
ChunksPerBatch int `yaml:"chunks_per_batch"`
|
ChunksPerBatch int `yaml:"chunks_per_batch"`
|
||||||
|
RowsPerBatch int64
|
||||||
TruncateTarget bool `yaml:"truncate_target"`
|
TruncateTarget bool `yaml:"truncate_target"`
|
||||||
TruncateMethod string `yaml:"truncate_method"`
|
TruncateMethod string `yaml:"truncate_method"`
|
||||||
Retry RetryConfig `yaml:"retry"`
|
Retry RetryConfig `yaml:"retry"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SourceDbInfo struct {
|
type TargetTableInfo struct {
|
||||||
|
Schema string `yaml:"schema"`
|
||||||
|
Table string `yaml:"table"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SourceTableInfo struct {
|
||||||
Schema string `yaml:"schema"`
|
Schema string `yaml:"schema"`
|
||||||
Table string `yaml:"table"`
|
Table string `yaml:"table"`
|
||||||
PrimaryKey string `yaml:"primary_key"`
|
PrimaryKey string `yaml:"primary_key"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TargetDbInfo struct {
|
|
||||||
Schema string `yaml:"schema"`
|
|
||||||
Table string `yaml:"table"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type Job struct {
|
type Job struct {
|
||||||
Name string `yaml:"name"`
|
Name string `yaml:"name"`
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
Source SourceDbInfo `yaml:"source"`
|
SourceTable SourceTableInfo `yaml:"source"`
|
||||||
Target TargetDbInfo `yaml:"target"`
|
TargetTable TargetTableInfo `yaml:"target"`
|
||||||
PreSQL []string `yaml:"pre_sql"`
|
PreSQL []string `yaml:"pre_sql"`
|
||||||
PostSQL []string `yaml:"post_sql"`
|
PostSQL []string `yaml:"post_sql"`
|
||||||
JobConfig `yaml:",inline"`
|
JobConfig `yaml:",inline"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type MigrationConfig struct {
|
type MigrationConfig struct {
|
||||||
@@ -51,9 +51,9 @@ type MigrationConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type rawConfig struct {
|
type rawConfig struct {
|
||||||
maxParallelWorkers int `yaml:"max_parallel_workers"`
|
MaxParallelWorkers int `yaml:"max_parallel_workers"`
|
||||||
defaults JobConfig `yaml:"defaults"`
|
Defaults JobConfig `yaml:"defaults"`
|
||||||
jobs []yaml.Node `yaml:"jobs"`
|
Jobs []yaml.Node `yaml:"jobs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
||||||
@@ -62,25 +62,28 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.MaxParallelWorkers = raw.maxParallelWorkers
|
c.MaxParallelWorkers = raw.MaxParallelWorkers
|
||||||
c.Defaults = raw.defaults
|
c.Defaults = raw.Defaults
|
||||||
|
c.Defaults.RowsPerBatch = int64(raw.Defaults.ChunkSize * raw.Defaults.ChunksPerBatch)
|
||||||
|
|
||||||
for _, node := range raw.jobs {
|
for _, node := range raw.Jobs {
|
||||||
job := Job{
|
job := Job{
|
||||||
JobConfig: raw.defaults,
|
JobConfig: raw.Defaults,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := node.Decode(&job); err != nil {
|
if err := node.Decode(&job); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
job.RowsPerBatch = int64(job.ChunkSize * job.ChunksPerBatch)
|
||||||
|
|
||||||
c.Jobs = append(c.Jobs, job)
|
c.Jobs = append(c.Jobs, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultConfigFileName string = "config.yml"
|
const defaultConfigFileName string = "config.yaml"
|
||||||
|
|
||||||
func filenamesOrDefault(filenames []string) []string {
|
func filenamesOrDefault(filenames []string) []string {
|
||||||
if len(filenames) == 0 {
|
if len(filenames) == 0 {
|
||||||
@@ -108,7 +111,7 @@ func ReadMigrationConfig(filenames ...string) (MigrationConfig, error) {
|
|||||||
|
|
||||||
var config MigrationConfig
|
var config MigrationConfig
|
||||||
if err := yaml.Unmarshal(data, &config); err != nil {
|
if err := yaml.Unmarshal(data, &config); err != nil {
|
||||||
log.Fatalf("Error parsing config file: %v", err)
|
return MigrationConfig{}, fmt.Errorf("Error parsing config file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return config, nil
|
return config, nil
|
||||||
|
|||||||
@@ -1,77 +1,17 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gopkg.in/yaml.v3"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RetryConfig struct {
|
func main() {
|
||||||
Attempts int `yaml:"attempts"`
|
log.SetLevel(log.DebugLevel)
|
||||||
}
|
|
||||||
|
|
||||||
type JobConfig struct {
|
migrationConfig, err := config.ReadMigrationConfig()
|
||||||
MaxExtractors int `yaml:"max_extractors"`
|
if err != nil {
|
||||||
MaxLoaders int `yaml:"max_loaders"`
|
log.Fatalf("error leyendo configuracion: %v", err)
|
||||||
QueueSize int `yaml:"queue_size"`
|
|
||||||
ChunkSize int `yaml:"chunk_size"`
|
|
||||||
ChunksPerBatch int `yaml:"chunks_per_batch"`
|
|
||||||
TruncateTarget bool `yaml:"truncate_target"`
|
|
||||||
TruncateMethod string `yaml:"truncate_method"`
|
|
||||||
Retry RetryConfig `yaml:"retry"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type SourceDbInfo struct {
|
|
||||||
Schema string `yaml:"schema"`
|
|
||||||
Table string `yaml:"table"`
|
|
||||||
PrimaryKey string `yaml:"primary_key"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type TargetDbInfo struct {
|
|
||||||
Schema string `yaml:"schema"`
|
|
||||||
Table string `yaml:"table"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type Job struct {
|
|
||||||
Name string `yaml:"name"`
|
|
||||||
Enabled bool `yaml:"enabled"`
|
|
||||||
Source SourceDbInfo `yaml:"source"`
|
|
||||||
Target TargetDbInfo `yaml:"target"`
|
|
||||||
PreSQL []string `yaml:"pre_sql"`
|
|
||||||
PostSQL []string `yaml:"post_sql"`
|
|
||||||
JobConfig `yaml:",inline"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type MigrationConfig struct {
|
|
||||||
MaxParallelWorkers int `yaml:"max_parallel_workers"`
|
|
||||||
Defaults JobConfig `yaml:"defaults"`
|
|
||||||
Jobs []Job `yaml:"jobs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type rawConfig struct {
|
|
||||||
maxParallelWorkers int `yaml:"max_parallel_workers"`
|
|
||||||
defaults JobConfig `yaml:"defaults"`
|
|
||||||
jobs []yaml.Node `yaml:"jobs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
|
||||||
var raw rawConfig
|
|
||||||
if err := value.Decode(&raw); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.MaxParallelWorkers = raw.maxParallelWorkers
|
log.Debugf("Config: %+v", migrationConfig)
|
||||||
c.Defaults = raw.defaults
|
|
||||||
|
|
||||||
for _, node := range raw.jobs {
|
|
||||||
job := Job{
|
|
||||||
JobConfig: raw.defaults,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := node.Decode(&job); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Jobs = append(c.Jobs, job)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user