From 524d892a60f93ed888bf9b34897acf4201c69e7c Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Thu, 9 Apr 2026 19:20:50 -0500 Subject: [PATCH] feat: refactor migration job structure to use SourceTableInfo and TargetTableInfo for improved configuration handling --- cmd/go_migrate/batch-generator.go | 21 +++++--- cmd/go_migrate/build-extract-query.go | 16 +++--- cmd/go_migrate/extractor.go | 19 +++---- cmd/go_migrate/inspect-columns.go | 20 ++++--- cmd/go_migrate/loader.go | 14 ++--- cmd/go_migrate/main.go | 32 ++++------- cmd/go_migrate/process.go | 23 +++++--- internal/app/config/migration.go | 59 +++++++++++---------- scripts/config-parser/main.go | 76 +++------------------------ 9 files changed, 118 insertions(+), 162 deletions(-) diff --git a/cmd/go_migrate/batch-generator.go b/cmd/go_migrate/batch-generator.go index 78311a9..19f9614 100644 --- a/cmd/go_migrate/batch-generator.go +++ b/cmd/go_migrate/batch-generator.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "github.com/google/uuid" ) @@ -19,7 +20,7 @@ type Batch struct { RetryCounter int } -func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, job MigrationJob) (int64, error) { +func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) (int64, error) { query := ` SELECT SUM(p.rows) AS count @@ -33,7 +34,7 @@ GROUP BY t.name` defer cancel() var rowsCount int64 - err := db.QueryRowContext(ctxTimeout, query, sql.Named("schema", job.Schema), sql.Named("table", job.Table)).Scan(&rowsCount) + err := db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) if err != nil { return 0, err } @@ -41,7 +42,7 @@ GROUP BY t.name` return rowsCount, nil } -func calculateBatchesMssql(ctx context.Context, db *sql.DB, job MigrationJob, batchCount int64) ([]Batch, error) { +func calculateBatchesMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo, batchCount int64) ([]Batch, error) { query := fmt.Sprintf(` SELECT MIN([%s]) AS lower_limit, @@ -49,7 +50,13 @@ SELECT FROM (SELECT [%s], NTILE(@batchCount) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T GROUP BY batch_id -ORDER BY batch_id`, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.Schema, job.Table) +ORDER BY batch_id`, + tableInfo.PrimaryKey, + tableInfo.PrimaryKey, + tableInfo.PrimaryKey, + tableInfo.PrimaryKey, + tableInfo.Schema, + tableInfo.Table) ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) defer cancel() @@ -84,8 +91,8 @@ ORDER BY batch_id`, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.PrimaryK return batches, nil } -func batchGeneratorMssql(ctx context.Context, db *sql.DB, job MigrationJob) ([]Batch, error) { - rowsCount, err := estimateTotalRowsMssql(ctx, db, job) +func batchGeneratorMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) ([]Batch, error) { + rowsCount, err := estimateTotalRowsMssql(ctx, db, tableInfo) if err != nil { return nil, err } @@ -101,7 +108,7 @@ func batchGeneratorMssql(ctx context.Context, db *sql.DB, job MigrationJob) ([]B }}, nil } - batches, err := calculateBatchesMssql(ctx, db, job, batchCount) + batches, err := calculateBatchesMssql(ctx, db, tableInfo, batchCount) if err != nil { return nil, err } diff --git a/cmd/go_migrate/build-extract-query.go b/cmd/go_migrate/build-extract-query.go index a43f09e..478a4a8 100644 --- a/cmd/go_migrate/build-extract-query.go +++ b/cmd/go_migrate/build-extract-query.go @@ -3,9 +3,11 @@ package main import ( "fmt" "strings" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" ) -func buildExtractQueryMssql(job MigrationJob, columns []ColumnType, includeRange bool, isMinInclusive bool) string { +func buildExtractQueryMssql(sourceDbInfo config.SourceTableInfo, columns []ColumnType, includeRange bool, isMinInclusive bool) string { var sbQuery strings.Builder sbQuery.WriteString("SELECT ") @@ -26,25 +28,25 @@ func buildExtractQueryMssql(job MigrationJob, columns []ColumnType, includeRange } } - fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", job.Schema, job.Table) + fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", sourceDbInfo.Schema, sourceDbInfo.Table) if includeRange { - fmt.Fprintf(&sbQuery, " WHERE [%s]", job.PrimaryKey) + fmt.Fprintf(&sbQuery, " WHERE [%s]", sourceDbInfo.PrimaryKey) if isMinInclusive { sbQuery.WriteString(" >=") } else { sbQuery.WriteString(" >") } - fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", job.PrimaryKey) + fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", sourceDbInfo.PrimaryKey) } - fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", job.PrimaryKey) + fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", sourceDbInfo.PrimaryKey) return sbQuery.String() } -func buildExtractQueryPostgres(job MigrationJob, columns []ColumnType) string { +func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []ColumnType) string { var sbColumns strings.Builder if len(columns) == 0 { @@ -69,5 +71,5 @@ func buildExtractQueryPostgres(job MigrationJob, columns []ColumnType) string { } } - return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), job.Schema, job.Table, job.PrimaryKey) + return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) } diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go index 3756d48..68421eb 100644 --- a/cmd/go_migrate/extractor.go +++ b/cmd/go_migrate/extractor.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" @@ -27,7 +28,7 @@ type Chunk struct { func extractFromMssql( ctx context.Context, db *sql.DB, - job MigrationJob, + tableInfo config.SourceTableInfo, columns []ColumnType, chunkSize int, chBatchesIn <-chan Batch, @@ -37,7 +38,7 @@ func extractFromMssql( wgActiveBatches *sync.WaitGroup, ) { indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool { - return strings.EqualFold(col.name, job.PrimaryKey) + return strings.EqualFold(col.name, tableInfo.PrimaryKey) }) if indexPrimaryKey == -1 { @@ -68,7 +69,7 @@ func extractFromMssql( return } - if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort { + if abort := processBatch(ctx, db, tableInfo, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort { return } } @@ -78,7 +79,7 @@ func extractFromMssql( func processBatch( ctx context.Context, db *sql.DB, - job MigrationJob, + tableInfo config.SourceTableInfo, columns []ColumnType, chunkSize int, batch Batch, @@ -87,7 +88,7 @@ func processBatch( chErrorsOut chan<- ExtractorError, wgActiveBatches *sync.WaitGroup, ) (abort bool) { - query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) + query := buildExtractQueryMssql(tableInfo, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive) log.Debug("Query used to extract data from mssql: ", query) var queryArgs []any @@ -206,8 +207,8 @@ func processBatch( return false } -func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error { - query := buildExtractQueryPostgres(job, columns) +func extractFromPostgres(ctx context.Context, tableInfo config.SourceTableInfo, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error { + query := buildExtractQueryPostgres(tableInfo, columns) log.Debug("Query used to extract data from postgres: ", query) rows, err := db.Query(ctx, query) @@ -229,13 +230,13 @@ func extractFromPostgres(ctx context.Context, job MigrationJob, columns []Column if len(rowsChunk) >= chunkSize { out <- rowsChunk rowsChunk = make([]UnknownRowValues, 0, chunkSize) - log.Infof("Chunk send... %+v", job) + log.Infof("Chunk send... %+v", tableInfo) } } if len(rowsChunk) > 0 { out <- rowsChunk - log.Infof("Chunk send... %+v", job) + log.Infof("Chunk send... %+v", tableInfo) } return nil diff --git a/cmd/go_migrate/inspect-columns.go b/cmd/go_migrate/inspect-columns.go index 4c07f46..e6936c4 100644 --- a/cmd/go_migrate/inspect-columns.go +++ b/cmd/go_migrate/inspect-columns.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" @@ -111,7 +112,7 @@ func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, sc return column } -func GetColumnTypesPostgres(db *pgxpool.Pool, migrationJob MigrationJob) ([]ColumnType, error) { +func GetColumnTypesPostgres(db *pgxpool.Pool, tableInfo config.TargetTableInfo) ([]ColumnType, error) { query := ` SELECT c.column_name AS name, @@ -129,7 +130,7 @@ ORDER BY c.ordinal_position; ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - rows, err := db.Query(ctx, query, migrationJob.Schema, migrationJob.Table) + rows, err := db.Query(ctx, query, tableInfo.Schema, tableInfo.Table) if err != nil { return nil, fmt.Errorf("Error querying column types: %w", err) } @@ -197,7 +198,7 @@ func MapMssqlColumn(column ColumnType) ColumnType { return column } -func GetColumnTypesMssql(db *sql.DB, migrationJob MigrationJob) ([]ColumnType, error) { +func GetColumnTypesMssql(db *sql.DB, tableInfo config.SourceTableInfo) ([]ColumnType, error) { query := ` SELECT c.name AS name, @@ -219,7 +220,7 @@ ORDER BY c.column_id; ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - rows, err := db.QueryContext(ctx, query, sql.Named("schema", migrationJob.Schema), sql.Named("table", migrationJob.Table)) + rows, err := db.QueryContext(ctx, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)) if err != nil { return nil, fmt.Errorf("Error querying column types: %w", err) } @@ -252,7 +253,12 @@ ORDER BY c.column_id; return colTypes, nil } -func GetColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob MigrationJob) ([]ColumnType, []ColumnType, error) { +func GetColumnTypes( + sourceDb *sql.DB, + targetDb *pgxpool.Pool, + sourceTable config.SourceTableInfo, + targetTable config.TargetTableInfo, +) ([]ColumnType, []ColumnType, error) { var sourceDbErr error var targetDbErr error var sourceColTypes []ColumnType @@ -260,14 +266,14 @@ func GetColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob Migra var wg sync.WaitGroup wg.Go(func() { - sourceColTypes, sourceDbErr = GetColumnTypesMssql(sourceDb, migrationJob) + sourceColTypes, sourceDbErr = GetColumnTypesMssql(sourceDb, sourceTable) if sourceDbErr != nil { log.Error("Error (sourceDb): ", sourceDbErr) } }) wg.Go(func() { - targetColTypes, targetDbErr = GetColumnTypesPostgres(targetDb, migrationJob) + targetColTypes, targetDbErr = GetColumnTypesPostgres(targetDb, targetTable) if targetDbErr != nil { log.Error("Error (targetDb): ", targetDbErr) } diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go index ac096cc..f309603 100644 --- a/cmd/go_migrate/loader.go +++ b/cmd/go_migrate/loader.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" @@ -18,14 +19,14 @@ import ( func loadRowsPostgres( ctx context.Context, db *pgxpool.Pool, - job MigrationJob, + tableInfo config.TargetTableInfo, columns []ColumnType, chChunksIn <-chan Chunk, chErrorsOut chan<- LoaderError, chJobErrorsOut chan<- JobError, wgActiveChunks *sync.WaitGroup, ) { - tableId := pgx.Identifier{job.Schema, job.Table} + tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} colNames := Map(columns, func(col ColumnType) string { return col.name }) @@ -102,7 +103,7 @@ func loadChunkPostgres( return false } -func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error { +func loadRowsMssql(ctx context.Context, tableInfo config.TargetTableInfo, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error { chunkCount := 0 totalRowsLoaded := 0 @@ -114,7 +115,7 @@ func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, return fmt.Errorf("error starting transaction: %w", err) } - fullTableName := fmt.Sprintf("[%s].[%s]", job.Schema, job.Table) + fullTableName := fmt.Sprintf("[%s].[%s]", tableInfo.Schema, tableInfo.Table) colNames := Map(columns, func(col ColumnType) string { return col.name }) @@ -177,14 +178,13 @@ func Map[T any, V any](input []T, mapper func(T) V) []V { return result } -func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) { - +func fakeLoader(tableInfo config.TargetTableInfo, columns []ColumnType, in <-chan [][]any) { for rows := range in { log.Debugf("Chunk received, loading data into...") for i, rowValues := range rows { if i%100 == 0 { - logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) + logSampleRow(tableInfo.Schema, tableInfo.Table, columns, rowValues, fmt.Sprintf("row %d", i)) } } } diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index f40b436..b02ca1a 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -4,28 +4,10 @@ import ( "context" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" log "github.com/sirupsen/logrus" ) -type MigrationJob struct { - Schema string - Table string - PrimaryKey string -} - -var migrationJobs []MigrationJob = []MigrationJob{ - { - Schema: "Cartografia", - Table: "MANZANA", - PrimaryKey: "GDB_ARCHIVE_OID", - }, - { - Schema: "Red", - Table: "PUERTO", - PrimaryKey: "ID_PUERTO", - }, -} - const ( NumExtractors int = 4 NumLoaders int = 8 @@ -37,6 +19,14 @@ const ( func main() { configureLog() + + migrationConfig, err := config.ReadMigrationConfig() + if err != nil { + log.Fatalf("error leyendo configuracion: %v", err) + } + + log.Debugf("Config: %+v", migrationConfig) + startTime := time.Now() ctx, cancel := context.WithCancel(context.Background()) @@ -53,8 +43,8 @@ func main() { defer sourceDb.Close() defer targetDb.Close() - for _, job := range migrationJobs { - log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table) + for _, job := range migrationConfig.Jobs { + log.Infof(">>> Processing job: %s.%s <<<", job.SourceTable.Schema, job.SourceTable.Table) processMigrationJob(ctx, sourceDb, targetDb, job) } diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index aeb267f..1417032 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" @@ -16,12 +17,12 @@ func processMigrationJob( ctx context.Context, sourceDb *sql.DB, targetDb *pgxpool.Pool, - job MigrationJob, + job config.Job, ) { jobStartTime := time.Now() - log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey) + log.Infof("Starting migration job: %s.%s [PK: %s]", job.SourceTable.Schema, job.SourceTable.Table, job.SourceTable.PrimaryKey) - sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job) + sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job.SourceTable, job.TargetTable) if err != nil { log.Fatal("Unexpected error: ", err) } @@ -32,7 +33,7 @@ func processMigrationJob( jobCtx, cancel := context.WithCancel(ctx) defer cancel() - batches, err := batchGeneratorMssql(jobCtx, sourceDb, job) + batches, err := batchGeneratorMssql(jobCtx, sourceDb, job.SourceTable) if err != nil { log.Error("Unexpected error calculating batch ranges: ", err) } @@ -65,7 +66,7 @@ func processMigrationJob( for range maxExtractors { wgExtractors.Go(func() { - extractFromMssql(jobCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) + extractFromMssql(jobCtx, sourceDb, job.SourceTable, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches) }) } @@ -90,7 +91,7 @@ func processMigrationJob( for range NumLoaders { wgLoaders.Go(func() { - loadRowsPostgres(jobCtx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks) + loadRowsPostgres(jobCtx, targetDb, job.TargetTable, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks) }) } @@ -128,8 +129,14 @@ func logColumnTypes(columnTypes []ColumnType, label string) { } } -func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) { - log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag) +func logSampleRow( + schema string, + table string, + columns []ColumnType, + rowValues UnknownRowValues, + tag string, +) { + log.Infof("[%s.%s] Sample row: (%s)", schema, table, tag) for i, col := range columns { log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i]) } diff --git a/internal/app/config/migration.go b/internal/app/config/migration.go index 072c0b0..11c5aff 100644 --- a/internal/app/config/migration.go +++ b/internal/app/config/migration.go @@ -4,7 +4,6 @@ import ( "fmt" "os" - log "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" ) @@ -13,35 +12,36 @@ type RetryConfig struct { } type JobConfig struct { - MaxExtractors int `yaml:"max_extractors"` - MaxLoaders int `yaml:"max_loaders"` - QueueSize int `yaml:"queue_size"` - ChunkSize int `yaml:"chunk_size"` - ChunksPerBatch int `yaml:"chunks_per_batch"` + MaxExtractors int `yaml:"max_extractors"` + MaxLoaders int `yaml:"max_loaders"` + QueueSize int `yaml:"queue_size"` + ChunkSize int `yaml:"chunk_size"` + ChunksPerBatch int `yaml:"chunks_per_batch"` + RowsPerBatch int64 TruncateTarget bool `yaml:"truncate_target"` TruncateMethod string `yaml:"truncate_method"` Retry RetryConfig `yaml:"retry"` } -type SourceDbInfo struct { +type TargetTableInfo struct { + Schema string `yaml:"schema"` + Table string `yaml:"table"` +} + +type SourceTableInfo struct { Schema string `yaml:"schema"` Table string `yaml:"table"` PrimaryKey string `yaml:"primary_key"` } -type TargetDbInfo struct { - Schema string `yaml:"schema"` - Table string `yaml:"table"` -} - type Job struct { - Name string `yaml:"name"` - Enabled bool `yaml:"enabled"` - Source SourceDbInfo `yaml:"source"` - Target TargetDbInfo `yaml:"target"` - PreSQL []string `yaml:"pre_sql"` - PostSQL []string `yaml:"post_sql"` - JobConfig `yaml:",inline"` + Name string `yaml:"name"` + Enabled bool `yaml:"enabled"` + SourceTable SourceTableInfo `yaml:"source"` + TargetTable TargetTableInfo `yaml:"target"` + PreSQL []string `yaml:"pre_sql"` + PostSQL []string `yaml:"post_sql"` + JobConfig `yaml:",inline"` } type MigrationConfig struct { @@ -51,9 +51,9 @@ type MigrationConfig struct { } type rawConfig struct { - maxParallelWorkers int `yaml:"max_parallel_workers"` - defaults JobConfig `yaml:"defaults"` - jobs []yaml.Node `yaml:"jobs"` + MaxParallelWorkers int `yaml:"max_parallel_workers"` + Defaults JobConfig `yaml:"defaults"` + Jobs []yaml.Node `yaml:"jobs"` } func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error { @@ -62,25 +62,28 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error { return err } - c.MaxParallelWorkers = raw.maxParallelWorkers - c.Defaults = raw.defaults + c.MaxParallelWorkers = raw.MaxParallelWorkers + c.Defaults = raw.Defaults + c.Defaults.RowsPerBatch = int64(raw.Defaults.ChunkSize * raw.Defaults.ChunksPerBatch) - for _, node := range raw.jobs { + for _, node := range raw.Jobs { job := Job{ - JobConfig: raw.defaults, + JobConfig: raw.Defaults, } if err := node.Decode(&job); err != nil { return err } + job.RowsPerBatch = int64(job.ChunkSize * job.ChunksPerBatch) + c.Jobs = append(c.Jobs, job) } return nil } -const defaultConfigFileName string = "config.yml" +const defaultConfigFileName string = "config.yaml" func filenamesOrDefault(filenames []string) []string { if len(filenames) == 0 { @@ -108,7 +111,7 @@ func ReadMigrationConfig(filenames ...string) (MigrationConfig, error) { var config MigrationConfig if err := yaml.Unmarshal(data, &config); err != nil { - log.Fatalf("Error parsing config file: %v", err) + return MigrationConfig{}, fmt.Errorf("Error parsing config file: %v", err) } return config, nil diff --git a/scripts/config-parser/main.go b/scripts/config-parser/main.go index 92f4c9f..fc1b13a 100644 --- a/scripts/config-parser/main.go +++ b/scripts/config-parser/main.go @@ -1,77 +1,17 @@ package main import ( - "gopkg.in/yaml.v3" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + log "github.com/sirupsen/logrus" ) -type RetryConfig struct { - Attempts int `yaml:"attempts"` -} +func main() { + log.SetLevel(log.DebugLevel) -type JobConfig struct { - MaxExtractors int `yaml:"max_extractors"` - MaxLoaders int `yaml:"max_loaders"` - QueueSize int `yaml:"queue_size"` - ChunkSize int `yaml:"chunk_size"` - ChunksPerBatch int `yaml:"chunks_per_batch"` - TruncateTarget bool `yaml:"truncate_target"` - TruncateMethod string `yaml:"truncate_method"` - Retry RetryConfig `yaml:"retry"` -} - -type SourceDbInfo struct { - Schema string `yaml:"schema"` - Table string `yaml:"table"` - PrimaryKey string `yaml:"primary_key"` -} - -type TargetDbInfo struct { - Schema string `yaml:"schema"` - Table string `yaml:"table"` -} - -type Job struct { - Name string `yaml:"name"` - Enabled bool `yaml:"enabled"` - Source SourceDbInfo `yaml:"source"` - Target TargetDbInfo `yaml:"target"` - PreSQL []string `yaml:"pre_sql"` - PostSQL []string `yaml:"post_sql"` - JobConfig `yaml:",inline"` -} - -type MigrationConfig struct { - MaxParallelWorkers int `yaml:"max_parallel_workers"` - Defaults JobConfig `yaml:"defaults"` - Jobs []Job `yaml:"jobs"` -} - -type rawConfig struct { - maxParallelWorkers int `yaml:"max_parallel_workers"` - defaults JobConfig `yaml:"defaults"` - jobs []yaml.Node `yaml:"jobs"` -} - -func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error { - var raw rawConfig - if err := value.Decode(&raw); err != nil { - return err + migrationConfig, err := config.ReadMigrationConfig() + if err != nil { + log.Fatalf("error leyendo configuracion: %v", err) } - c.MaxParallelWorkers = raw.maxParallelWorkers - c.Defaults = raw.defaults - - for _, node := range raw.jobs { - job := Job{ - JobConfig: raw.defaults, - } - - if err := node.Decode(&job); err != nil { - return err - } - - c.Jobs = append(c.Jobs, job) - } - - return nil + log.Debugf("Config: %+v", migrationConfig) }