package main import ( "context" "database/sql" "fmt" ) type BatchRange struct { LowerLimit int UpperLimit int validRange bool } func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, job MigrationJob) (int, error) { query := ` SELECT SUM(p.rows) AS count FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id JOIN sys.partitions p ON t.object_id = p.object_id WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1) GROUP BY t.name` var rowsCount int err := db.QueryRowContext(ctx, query, sql.Named("schema", job.Schema), sql.Named("table", job.Table)).Scan(&rowsCount) if err != nil { return 0, err } return rowsCount, nil } func calculateChunkRangesMssql(ctx context.Context, db *sql.DB, job MigrationJob, batchCount int) ([]BatchRange, error) { query := fmt.Sprintf(` SELECT MIN([%s]) AS lower_limit, MAX([%s]) AS upper_limit FROM (SELECT [%s], NTILE(@batchCount) OVER (ORDER BY [%s]) AS chunk_id FROM [%s].[%s]) AS T GROUP BY chunk_id ORDER BY chunk_id`, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.Schema, job.Table) rows, err := db.QueryContext(ctx, query, sql.Named("batchCount", batchCount)) if err != nil { return nil, err } defer rows.Close() batchRanges := make([]BatchRange, 0, batchCount) for rows.Next() { var br BatchRange br.validRange = true if err := rows.Scan(&br.LowerLimit, &br.UpperLimit); err != nil { return nil, err } batchRanges = append(batchRanges, br) } if err := rows.Err(); err != nil { return nil, err } return batchRanges, nil } const estimatedRowsPerBatch = 100_000 func calculateBatchMetrics(ctx context.Context, db *sql.DB, job MigrationJob) ([]BatchRange, error) { rowsCount, err := estimateTotalRowsMssql(ctx, db, job) if err != nil { return nil, err } batchCount := 1 if rowsCount > estimatedRowsPerBatch { batchCount = rowsCount / estimatedRowsPerBatch } else { return []BatchRange{{validRange: false}}, nil } chunksRange, err := calculateChunkRangesMssql(ctx, db, job, batchCount) if err != nil { return nil, err } return chunksRange, nil }