package main import ( "context" "database/sql" "sync" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "github.com/jackc/pgx/v5/pgxpool" log "github.com/sirupsen/logrus" ) func main() { configureLog() migrationConfig, err := config.ReadMigrationConfig() if err != nil { log.Fatalf("error leyendo configuracion: %v", err) } log.Debugf("Config: %+v", migrationConfig) startTime := time.Now() ctx, cancel := context.WithCancel(context.Background()) defer cancel() log.Info("=== Starting migration ===") sourceDb, targetDb, connError := connectToDatabases() if connError != nil { log.Fatal("Connection error: ", connError) } defer sourceDb.Close() defer targetDb.Close() results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) log.Info("=== RESUMEN DE MIGRACIÓN ===") var totalProcessed, totalErrors int64 for _, res := range results { status := "OK" if res.Error != nil { status = "FAILED" } log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration) totalProcessed += res.RowsLoaded if res.Error != nil { totalErrors++ } } log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed) totalDuration := time.Since(startTime) log.Infof("=== Migration completed successfully! ===") log.Infof("Total migration time: %v", totalDuration) } func processMigrationJobs( ctx context.Context, sourceDb *sql.DB, targetDb *pgxpool.Pool, jobs []config.Job, maxParallelWorkers int, ) []JobResult { if len(jobs) == 0 { log.Info("No migration jobs configured") return []JobResult{} } if maxParallelWorkers <= 0 { maxParallelWorkers = 1 } if maxParallelWorkers > len(jobs) { maxParallelWorkers = len(jobs) } log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers) chJobResults := make(chan JobResult, len(jobs)) chJobs := make(chan config.Job, len(jobs)) var wgJobs sync.WaitGroup extractor := extractors.NewMssqlExtractor(sourceDb) transformer := transformers.NewMssqlTransformer() loader := loaders.NewPostgresLoader(targetDb) for i := range maxParallelWorkers { wgJobs.Go(func() { for job := range chJobs { log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) res := processMigrationJob( ctx, sourceDb, targetDb, extractor, transformer, loader, job, ) chJobResults <- res } }) } for _, job := range jobs { chJobs <- job } close(chJobs) go func() { wgJobs.Wait() close(chJobResults) }() var finalResults []JobResult for res := range chJobResults { finalResults = append(finalResults, res) } return finalResults }