feat: implement job configuration structure and YAML parsing for migration jobs

This commit is contained in:
2026-04-09 18:12:11 -05:00
parent adbc962464
commit e8ace6ecf9
4 changed files with 182 additions and 109 deletions

4
go.mod
View File

@@ -4,13 +4,13 @@ go 1.25.7
require ( require (
github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4 github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4
github.com/goccy/go-yaml v1.19.2
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.9.1 github.com/jackc/pgx/v5 v5.9.1
github.com/joho/godotenv v1.5.1 github.com/joho/godotenv v1.5.1
github.com/microsoft/go-mssqldb v1.9.8 github.com/microsoft/go-mssqldb v1.9.8
github.com/sirupsen/logrus v1.9.4 github.com/sirupsen/logrus v1.9.4
github.com/twpayne/go-geom v1.6.1 github.com/twpayne/go-geom v1.6.1
gopkg.in/yaml.v3 v3.0.1
) )
require ( require (
@@ -19,6 +19,8 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect
golang.org/x/crypto v0.48.0 // indirect golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect golang.org/x/sync v0.19.0 // indirect

11
go.sum
View File

@@ -16,13 +16,12 @@ github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZ
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4 h1:4vH4+3zfwZTqoJEFw7DsTaH1V8jgVwnyeDvNi2TxzAc= github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4 h1:4vH4+3zfwZTqoJEFw7DsTaH1V8jgVwnyeDvNi2TxzAc=
github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4/go.mod h1:jlB0I5BIfcJBGdV6rRGPthSBfeY86RGkSAwcsldbHJc= github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4/go.mod h1:jlB0I5BIfcJBGdV6rRGPthSBfeY86RGkSAwcsldbHJc=
github.com/goccy/go-yaml v1.19.2 h1:PmFC1S6h8ljIz6gMRBopkjP1TVT7xuwrButHID66PoM=
github.com/goccy/go-yaml v1.19.2/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
@@ -43,6 +42,10 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/microsoft/go-mssqldb v1.9.8 h1:d4IFMvF/o+HdpXUqbBfzHvn/NlFA75YGcfHUUvDFJEM= github.com/microsoft/go-mssqldb v1.9.8 h1:d4IFMvF/o+HdpXUqbBfzHvn/NlFA75YGcfHUUvDFJEM=
@@ -51,6 +54,8 @@ github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmd
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w=
@@ -73,6 +78,8 @@ golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,115 @@
package config
import (
"fmt"
"os"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
)
type RetryConfig struct {
Attempts int `yaml:"attempts"`
}
type JobConfig struct {
MaxExtractors int `yaml:"max_extractors"`
MaxLoaders int `yaml:"max_loaders"`
QueueSize int `yaml:"queue_size"`
ChunkSize int `yaml:"chunk_size"`
ChunksPerBatch int `yaml:"chunks_per_batch"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
Retry RetryConfig `yaml:"retry"`
}
type SourceDbInfo struct {
Schema string `yaml:"schema"`
Table string `yaml:"table"`
PrimaryKey string `yaml:"primary_key"`
}
type TargetDbInfo struct {
Schema string `yaml:"schema"`
Table string `yaml:"table"`
}
type Job struct {
Name string `yaml:"name"`
Enabled bool `yaml:"enabled"`
Source SourceDbInfo `yaml:"source"`
Target TargetDbInfo `yaml:"target"`
PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"`
}
type MigrationConfig struct {
MaxParallelWorkers int `yaml:"max_parallel_workers"`
Defaults JobConfig `yaml:"defaults"`
Jobs []Job `yaml:"jobs"`
}
type rawConfig struct {
maxParallelWorkers int `yaml:"max_parallel_workers"`
defaults JobConfig `yaml:"defaults"`
jobs []yaml.Node `yaml:"jobs"`
}
func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
var raw rawConfig
if err := value.Decode(&raw); err != nil {
return err
}
c.MaxParallelWorkers = raw.maxParallelWorkers
c.Defaults = raw.defaults
for _, node := range raw.jobs {
job := Job{
JobConfig: raw.defaults,
}
if err := node.Decode(&job); err != nil {
return err
}
c.Jobs = append(c.Jobs, job)
}
return nil
}
const defaultConfigFileName string = "config.yml"
func filenamesOrDefault(filenames []string) []string {
if len(filenames) == 0 {
return []string{defaultConfigFileName}
}
return filenames
}
func ReadMigrationConfig(filenames ...string) (MigrationConfig, error) {
filenames = filenamesOrDefault(filenames)
var data []byte
var err error
for _, filename := range filenames {
data, err = os.ReadFile(filename)
if err != nil {
continue
}
break
}
if err != nil {
return MigrationConfig{}, fmt.Errorf("Error reading config file: %v", err)
}
var config MigrationConfig
if err := yaml.Unmarshal(data, &config); err != nil {
log.Fatalf("Error parsing config file: %v", err)
}
return config, nil
}

View File

@@ -1,128 +1,77 @@
package main package main
import ( import (
"fmt" "gopkg.in/yaml.v3"
"log"
"os"
"github.com/goccy/go-yaml"
) )
type RetryConfig struct { type RetryConfig struct {
Attempts int `yaml:"attempts"` Attempts int `yaml:"attempts"`
} }
type DBInfo struct { type JobConfig struct {
Schema string `yaml:"schema"` MaxExtractors int `yaml:"max_extractors"`
Table string `yaml:"table"` MaxLoaders int `yaml:"max_loaders"`
PrimaryKey string `yaml:"primary_key,omitempty"` QueueSize int `yaml:"queue_size"`
ChunkSize int `yaml:"chunk_size"`
ChunksPerBatch int `yaml:"chunks_per_batch"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
Retry RetryConfig `yaml:"retry"`
} }
type JobSettings struct { type SourceDbInfo struct {
MaxExtractors *int `yaml:"max_extractors"` Schema string `yaml:"schema"`
MaxLoaders *int `yaml:"max_loaders"` Table string `yaml:"table"`
QueueSize *int `yaml:"queue_size"` PrimaryKey string `yaml:"primary_key"`
ChunkSize *int `yaml:"chunk_size"` }
ChunksPerBatch *int `yaml:"chunks_per_batch"`
TruncateTarget *bool `yaml:"truncate_target"` type TargetDbInfo struct {
TruncateMethod *string `yaml:"truncate_method"` Schema string `yaml:"schema"`
Retry *RetryConfig `yaml:"retry"` Table string `yaml:"table"`
} }
type Job struct { type Job struct {
Name string `yaml:"name"` Name string `yaml:"name"`
Enabled bool `yaml:"enabled"` Enabled bool `yaml:"enabled"`
Source DBInfo `yaml:"source"` Source SourceDbInfo `yaml:"source"`
Target DBInfo `yaml:"target"` Target TargetDbInfo `yaml:"target"`
PreSQL []string `yaml:"pre_sql"` PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"` PostSQL []string `yaml:"post_sql"`
JobSettings `yaml:",inline"` JobConfig `yaml:",inline"`
} }
type Config struct { type MigrationConfig struct {
MaxParallelWorkers int `yaml:"max_parallel_workers"` MaxParallelWorkers int `yaml:"max_parallel_workers"`
Defaults JobSettings `yaml:"defaults"` Defaults JobConfig `yaml:"defaults"`
Jobs []Job `yaml:"jobs"` Jobs []Job `yaml:"jobs"`
} }
func main() { type rawConfig struct {
yamlFile, err := os.ReadFile("config.yaml") maxParallelWorkers int `yaml:"max_parallel_workers"`
if err != nil { defaults JobConfig `yaml:"defaults"`
log.Fatalf("Error leyendo archivo: %v", err) jobs []yaml.Node `yaml:"jobs"`
}
var config Config
err = yaml.Unmarshal(yamlFile, &config)
if err != nil {
log.Fatalf("Error parseando YAML: %v", err)
}
fmt.Printf("Configuración cargada. Trabajos: %d\n", len(config.Jobs))
for i, job := range config.Jobs {
jobPtr := &config.Jobs[i]
if job.MaxExtractors == nil {
jobPtr.MaxExtractors = config.Defaults.MaxExtractors
}
if job.MaxLoaders == nil {
jobPtr.MaxLoaders = config.Defaults.MaxLoaders
}
if job.QueueSize == nil {
jobPtr.QueueSize = config.Defaults.QueueSize
}
if job.ChunkSize == nil {
jobPtr.ChunkSize = config.Defaults.ChunkSize
}
if job.ChunksPerBatch == nil {
jobPtr.ChunksPerBatch = config.Defaults.ChunksPerBatch
}
if job.TruncateTarget == nil {
jobPtr.TruncateTarget = config.Defaults.TruncateTarget
}
if job.TruncateMethod == nil {
jobPtr.TruncateMethod = config.Defaults.TruncateMethod
}
if job.Retry == nil {
jobPtr.Retry = config.Defaults.Retry
}
}
printConfig(config)
} }
func printConfig(config Config) { func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
fmt.Println("Max parallel workers: ", config.MaxParallelWorkers) var raw rawConfig
if err := value.Decode(&raw); err != nil {
fmt.Println("Defaults:") return err
fmt.Printf("\tMaxExtractors: %v\n", *config.Defaults.MaxExtractors)
fmt.Printf("\tMaxLoaders: %v\n", *config.Defaults.MaxLoaders)
fmt.Printf("\tQueueSize: %v\n", *config.Defaults.QueueSize)
fmt.Printf("\tChunkSize: %v\n", *config.Defaults.ChunkSize)
fmt.Printf("\tChunksPerBatch: %v\n", *config.Defaults.ChunksPerBatch)
fmt.Printf("\tTruncateTarget: %v\n", *config.Defaults.TruncateTarget)
fmt.Printf("\tTruncateMethod: %v\n", *config.Defaults.TruncateMethod)
fmt.Printf("\tRetry: %+v\n", *config.Defaults.Retry)
fmt.Println("Jobs:")
for i, job := range config.Jobs {
fmt.Printf("Job Name: %v\n", job.Name)
fmt.Printf("\tEnabled: %v\n", job.Enabled)
fmt.Printf("\tSource: %+v\n", job.Source)
fmt.Printf("\tTarget: %+v\n", job.Target)
fmt.Printf("\tMaxExtractors: %v\n", *job.MaxExtractors)
fmt.Printf("\tMaxLoaders: %v\n", *job.MaxLoaders)
fmt.Printf("\tQueueSize: %v\n", *job.QueueSize)
fmt.Printf("\tChunkSize: %v\n", *job.ChunkSize)
fmt.Printf("\tChunksPerBatch: %v\n", *job.ChunksPerBatch)
fmt.Printf("\tTruncateTarget: %v\n", *job.TruncateTarget)
fmt.Printf("\tTruncateMethod: %v\n", *job.TruncateMethod)
fmt.Printf("\tRetry: %+v\n", *job.Retry)
fmt.Printf("\tPreSQL: %v\n", job.PreSQL)
fmt.Printf("\tPostSQL: %v\n", job.PostSQL)
if i >= 2 {
fmt.Println("Skipping remaining jobs...")
} }
c.MaxParallelWorkers = raw.maxParallelWorkers
c.Defaults = raw.defaults
for _, node := range raw.jobs {
job := Job{
JobConfig: raw.defaults,
} }
if err := node.Decode(&job); err != nil {
return err
}
c.Jobs = append(c.Jobs, job)
}
return nil
} }