package config import ( "fmt" "os" "gopkg.in/yaml.v3" ) type RetryConfig struct { Attempts int `yaml:"attempts"` BaseDelayMs int `yaml:"base_delay_ms"` MaxDelayMs int `yaml:"max_delay_ms"` MaxJitterMs int `yaml:"max_jitter_ms"` } type ToStorageColumnConfig struct { Source string `yaml:"source"` Target string `yaml:"target"` Mode string `yaml:"mode"` } type ToStorageConfig struct { Columns []ToStorageColumnConfig `yaml:"columns"` } type JobConfig struct { MaxExtractors int `yaml:"max_extractors"` MaxLoaders int `yaml:"max_loaders"` QueueSize int `yaml:"queue_size"` BatchSize int `yaml:"batch_size"` BatchesPerPartition int `yaml:"batches_per_partition"` TruncateTarget bool `yaml:"truncate_target"` TruncateMethod string `yaml:"truncate_method"` MaxPartitionErrrors int `yaml:"max_partition_errrors"` MaxChunkErrors int `yaml:"max_chunk_errors"` Retry RetryConfig `yaml:"retry"` RowsPerPartition int64 ToStorage ToStorageConfig `yaml:"to_storage"` } type TableInfo struct { Schema string `yaml:"schema"` Table string `yaml:"table"` } type SourceTableInfo struct { TableInfo `yaml:",inline"` PrimaryKey string `yaml:"primary_key"` } type TargetTableInfo struct { TableInfo `yaml:",inline"` PreSQL []string `yaml:"pre_sql"` PostSQL []string `yaml:"post_sql"` } type RangeConfig struct { Min int64 `yaml:"min"` Max int64 `yaml:"max"` IsMinInclusive bool `yaml:"is_min_inclusive"` IsMaxInclusive bool `yaml:"is_max_inclusive"` } type Job struct { Name string `yaml:"name"` Enabled bool `yaml:"enabled"` SourceTable SourceTableInfo `yaml:"source"` TargetTable TargetTableInfo `yaml:"target"` JobConfig `yaml:",inline"` Range RangeConfig `yaml:"range"` } type MigrationConfig struct { MaxParallelWorkers int `yaml:"max_parallel_workers"` SourceDbType string `yaml:"source_db_type"` TargetDbType string `yaml:"target_db_type"` Defaults JobConfig `yaml:"defaults"` Jobs []Job `yaml:"jobs"` } type rawConfig struct { MaxParallelWorkers int `yaml:"max_parallel_workers"` SourceDbType string `yaml:"source_db_type"` TargetDbType string `yaml:"target_db_type"` Defaults JobConfig `yaml:"defaults"` Jobs []yaml.Node `yaml:"jobs"` } func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error { var raw rawConfig if err := value.Decode(&raw); err != nil { return err } c.MaxParallelWorkers = raw.MaxParallelWorkers c.Defaults = raw.Defaults c.SourceDbType = raw.SourceDbType c.TargetDbType = raw.TargetDbType c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition) for _, node := range raw.Jobs { job := Job{ JobConfig: raw.Defaults, } if err := node.Decode(&job); err != nil { return err } job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition) c.Jobs = append(c.Jobs, job) } return nil } const defaultConfigFileName string = "config.yaml" func filenamesOrDefault(filenames []string) []string { if len(filenames) == 0 { return []string{defaultConfigFileName} } return filenames } func ReadMigrationConfig(filenames ...string) (MigrationConfig, error) { filenames = filenamesOrDefault(filenames) var data []byte var err error for _, filename := range filenames { data, err = os.ReadFile(filename) if err != nil { continue } break } if err != nil { return MigrationConfig{}, fmt.Errorf("Error reading config file: %v", err) } var config MigrationConfig if err := yaml.Unmarshal(data, &config); err != nil { return MigrationConfig{}, fmt.Errorf("Error parsing config file: %v", err) } return config, nil }