141 lines
3.6 KiB
Go
141 lines
3.6 KiB
Go
package config
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
|
|
"gopkg.in/yaml.v3"
|
|
)
|
|
|
|
type RetryConfig struct {
|
|
Attempts int `yaml:"attempts"`
|
|
BaseDelayMs int `yaml:"base_delay_ms"`
|
|
MaxDelayMs int `yaml:"max_delay_ms"`
|
|
MaxJitterMs int `yaml:"max_jitter_ms"`
|
|
}
|
|
|
|
type JobConfig struct {
|
|
MaxExtractors int `yaml:"max_extractors"`
|
|
MaxLoaders int `yaml:"max_loaders"`
|
|
QueueSize int `yaml:"queue_size"`
|
|
BatchSize int `yaml:"batch_size"`
|
|
BatchesPerPartition int `yaml:"batches_per_partition"`
|
|
TruncateTarget bool `yaml:"truncate_target"`
|
|
TruncateMethod string `yaml:"truncate_method"`
|
|
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
|
|
MaxChunkErrors int `yaml:"max_chunk_errors"`
|
|
Retry RetryConfig `yaml:"retry"`
|
|
RowsPerPartition int64
|
|
}
|
|
|
|
type TableInfo struct {
|
|
Schema string `yaml:"schema"`
|
|
Table string `yaml:"table"`
|
|
}
|
|
|
|
type SourceTableInfo struct {
|
|
TableInfo `yaml:",inline"`
|
|
PrimaryKey string `yaml:"primary_key"`
|
|
}
|
|
|
|
type TargetTableInfo struct {
|
|
TableInfo `yaml:",inline"`
|
|
PreSQL []string `yaml:"pre_sql"`
|
|
PostSQL []string `yaml:"post_sql"`
|
|
}
|
|
|
|
type RangeConfig struct {
|
|
Min int64 `yaml:"min"`
|
|
Max int64 `yaml:"max"`
|
|
IsMinInclusive bool `yaml:"is_min_inclusive"`
|
|
IsMaxInclusive bool `yaml:"is_max_inclusive"`
|
|
}
|
|
|
|
type Job struct {
|
|
Name string `yaml:"name"`
|
|
Enabled bool `yaml:"enabled"`
|
|
SourceTable SourceTableInfo `yaml:"source"`
|
|
TargetTable TargetTableInfo `yaml:"target"`
|
|
JobConfig `yaml:",inline"`
|
|
Range RangeConfig `yaml:"range"`
|
|
}
|
|
|
|
type MigrationConfig struct {
|
|
MaxParallelWorkers int `yaml:"max_parallel_workers"`
|
|
SourceDbType string `yaml:"source_db_type"`
|
|
TargetDbType string `yaml:"target_db_type"`
|
|
Defaults JobConfig `yaml:"defaults"`
|
|
Jobs []Job `yaml:"jobs"`
|
|
}
|
|
|
|
type rawConfig struct {
|
|
MaxParallelWorkers int `yaml:"max_parallel_workers"`
|
|
SourceDbType string `yaml:"source_db_type"`
|
|
TargetDbType string `yaml:"target_db_type"`
|
|
Defaults JobConfig `yaml:"defaults"`
|
|
Jobs []yaml.Node `yaml:"jobs"`
|
|
}
|
|
|
|
func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
|
var raw rawConfig
|
|
if err := value.Decode(&raw); err != nil {
|
|
return err
|
|
}
|
|
|
|
c.MaxParallelWorkers = raw.MaxParallelWorkers
|
|
c.Defaults = raw.Defaults
|
|
c.SourceDbType = raw.SourceDbType
|
|
c.TargetDbType = raw.TargetDbType
|
|
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
|
|
|
|
for _, node := range raw.Jobs {
|
|
job := Job{
|
|
JobConfig: raw.Defaults,
|
|
}
|
|
|
|
if err := node.Decode(&job); err != nil {
|
|
return err
|
|
}
|
|
|
|
job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition)
|
|
|
|
c.Jobs = append(c.Jobs, job)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
const defaultConfigFileName string = "config.yaml"
|
|
|
|
func filenamesOrDefault(filenames []string) []string {
|
|
if len(filenames) == 0 {
|
|
return []string{defaultConfigFileName}
|
|
}
|
|
return filenames
|
|
}
|
|
|
|
func ReadMigrationConfig(filenames ...string) (MigrationConfig, error) {
|
|
filenames = filenamesOrDefault(filenames)
|
|
var data []byte
|
|
var err error
|
|
|
|
for _, filename := range filenames {
|
|
data, err = os.ReadFile(filename)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
return MigrationConfig{}, fmt.Errorf("Error reading config file: %v", err)
|
|
}
|
|
|
|
var config MigrationConfig
|
|
if err := yaml.Unmarshal(data, &config); err != nil {
|
|
return MigrationConfig{}, fmt.Errorf("Error parsing config file: %v", err)
|
|
}
|
|
|
|
return config, nil
|
|
}
|