diff --git a/cmd/go_migrate/dryrun.go b/cmd/go_migrate/dryrun.go new file mode 100644 index 0000000..e496318 --- /dev/null +++ b/cmd/go_migrate/dryrun.go @@ -0,0 +1,112 @@ +package main + +import ( + "context" + "fmt" + "sync" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" + log "github.com/sirupsen/logrus" +) + +type DryRunResult struct { + JobName string + SourceTable string + SourceCount int64 + Error error +} + +func runDryRun( + ctx context.Context, + azureClient *azure.Client, + sourceDb dbwrapper.DbWrapper, + jobs []config.Job, + maxParallelWorkers int, +) { + log.Info("=== DRY RUN ===") + log.Info("[DB] Source connection: OK") + log.Info("[DB] Target connection: OK") + + if azureClient != nil { + if err := azureClient.Ping(ctx); err != nil { + log.Errorf("[STORAGE] Azure: FAIL — %v", err) + } else { + log.Info("[STORAGE] Azure: OK") + } + } else { + log.Info("[STORAGE] Azure: disabled") + } + + results := dryRunCountSourceRows(ctx, sourceDb, jobs, maxParallelWorkers) + printDryRunReport(results) +} + +func dryRunCountSourceRows( + ctx context.Context, + sourceDb dbwrapper.DbWrapper, + jobs []config.Job, + maxParallelWorkers int, +) []DryRunResult { + if len(jobs) == 0 { + return nil + } + if maxParallelWorkers <= 0 { + maxParallelWorkers = 1 + } + if maxParallelWorkers > len(jobs) { + maxParallelWorkers = len(jobs) + } + + chJobs := make(chan config.Job, len(jobs)) + var mu sync.Mutex + var results []DryRunResult + var wg sync.WaitGroup + + for range maxParallelWorkers { + wg.Go(func() { + for job := range chJobs { + res := DryRunResult{ + JobName: job.Name, + SourceTable: fmt.Sprintf("[%s].[%s]", job.SourceTable.Schema, job.SourceTable.Table), + } + count, err := countSourceRows(ctx, sourceDb, job) + if err != nil { + res.Error = err + } else { + res.SourceCount = count + } + mu.Lock() + results = append(results, res) + mu.Unlock() + } + }) + } + + for _, job := range jobs { + chJobs <- job + } + close(chJobs) + wg.Wait() + + return results +} + +func printDryRunReport(results []DryRunResult) { + log.Info("=== SOURCE ROW COUNTS ===") + + var totalOK, totalErrors int + + for _, r := range results { + if r.Error != nil { + log.Errorf("[%s] %s — ERROR: %v", r.JobName, r.SourceTable, r.Error) + totalErrors++ + } else { + log.Infof("[%s] %s — rows: %d", r.JobName, r.SourceTable, r.SourceCount) + totalOK++ + } + } + + log.Infof("=== Dry run complete: %d OK, %d errors ===", totalOK, totalErrors) +} diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index fcd7679..eca532f 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -23,6 +23,7 @@ func main() { configPath := flag.String("config", "", "path to migration config file") validate := flag.Bool("validate", false, "count rows in source and target per job and compare") + dryRun := flag.Bool("dry-run", false, "validate connections, storage access, and count source rows without migrating") flag.Parse() if flag.NArg() > 1 { @@ -45,8 +46,6 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - log.Info("=== Starting migration ===") - var wgConnect errgroup.Group var sourceDb, targetDb dbwrapper.DbWrapper @@ -79,13 +78,29 @@ func main() { defer sourceDb.Close() defer targetDb.Close() + var azureClient *azure.Client + if config.App.AzureStorage.Enabled { + var err error + azureClient, err = azure.NewClient(config.App.AzureStorage) + if err != nil { + log.Fatalf("Failed to create Azure storage client: %v", err) + } + } + + if *dryRun { + runDryRun(ctx, azureClient, sourceDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) + return + } + if *validate { validationResults := validateJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) printValidationReport(validationResults) return } - results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) + log.Info("=== Starting migration ===") + + results := processMigrationJobs(ctx, sourceDb, targetDb, azureClient, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers) log.Info("=== RESUMEN DE MIGRACIÓN ===") var totalProcessed, totalErrors int64 @@ -116,6 +131,7 @@ func processMigrationJobs( ctx context.Context, sourceDb dbwrapper.DbWrapper, targetDb dbwrapper.DbWrapper, + azureClient *azure.Client, jobs []config.Job, maxParallelWorkers int, ) []models.JobResult { @@ -143,15 +159,6 @@ func processMigrationJobs( extractor := extractors.NewExtractor(sourceDb) loader := loaders.NewGenericLoader(targetDb) - var azureClient *azure.Client - if config.App.AzureStorage.Enabled { - var err error - azureClient, err = azure.NewClient(config.App.AzureStorage) - if err != nil { - log.Fatalf("Failed to create Azure storage client: %v", err) - } - } - for i := range maxParallelWorkers { wgJobs.Go(func() { for job := range chJobs { diff --git a/internal/app/azure/main.go b/internal/app/azure/main.go index 6b32a84..96f7041 100644 --- a/internal/app/azure/main.go +++ b/internal/app/azure/main.go @@ -70,6 +70,15 @@ func (c *Client) UploadBuffer(ctx context.Context, containerName, blobPath strin return nil } +func (c *Client) Ping(ctx context.Context) error { + pager := c.client.NewListBlobsFlatPager(c.azureStorageConfig.Container, nil) + _, err := pager.NextPage(ctx) + if err != nil { + return fmt.Errorf("storage access check failed: %w", err) + } + return nil +} + func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []byte) (string, error) { if blobPath == "" || buffer == nil { return "", ErrInvalidInput