package main import ( "context" "database/sql" "fmt" "math/rand" "sync" "time" "github.com/gaspardle/go-mssqlclrgeo" "github.com/google/uuid" mssql "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" "github.com/twpayne/go-geom" "github.com/twpayne/go-geom/encoding/wkb" ) const ( totalRows int = 5_000_000 chunkSize int = 50_000 schema string = "Cartografia" table string = "MANZANA" queueSize int = 4 ) func main() { log.SetFormatter(&log.TextFormatter{ FullTimestamp: true, TimestampFormat: time.StampMilli, DisableSorting: false, PadLevelText: true, }) log.SetLevel(log.DebugLevel) db, connError := connectToSqlServer() if connError != nil { log.Fatal("Connection error: ", connError) } defer db.Close() ctx := context.Background() maxOid, err := getMaxGDBArchiveOid(ctx, db) if err != nil { log.Fatal("Error getting max GDB_ARCHIVE_OID: ", err) } log.Infof("Starting data generation from GDB_ARCHIVE_OID: %d", maxOid+1) rowsChan := make(chan []UnknownRowValues, queueSize) var wgRowGenerator sync.WaitGroup wgRowGenerator.Go(func() { generateManzanaRows(ctx, maxOid, totalRows, chunkSize, rowsChan) }) columns := []string{ "GDB_ARCHIVE_OID", "ID_MANZANA", "ID_DISTRITO", "NOMBRE", "CODIGO", "CANTIDAD_TOTAL", "OCUPACION_RESIDENCIAL", "OCUPACION_NEGOCIO", "OCUPACION_DEPARTAMENTO", "INDICADOR", "FECHA_ALTA", "FECHA_ACT", "Shape", "GDB_GEOMATTR_DATA", "GlobalID", "GDB_FROM_DATE", "GDB_TO_DATE", "OBJECTID", } job := MigrationJob{ Schema: schema, Table: table, } if err := loadRowsMssql(ctx, job, columns, db, rowsChan); err != nil { log.Fatal("Error loading rows: ", err) } log.Info("Data generation and loading completed successfully") wgRowGenerator.Wait() } func loadRowsMssql(ctx context.Context, job MigrationJob, colNames []string, db *sql.DB, in <-chan []UnknownRowValues) error { chunkCount := 0 totalRowsLoaded := 0 for rows := range in { chunkStartTime := time.Now() tx, err := db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("error starting transaction: %w", err) } fullTableName := fmt.Sprintf("[%s].[%s]", job.Schema, job.Table) stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, colNames...)) if err != nil { tx.Rollback() return fmt.Errorf("error preparing bulk copy statement: %w", err) } copyStartTime := time.Now() for _, row := range rows { _, err = stmt.ExecContext(ctx, row...) if err != nil { stmt.Close() tx.Rollback() return fmt.Errorf("error executing row insert: %w", err) } } result, err := stmt.ExecContext(ctx) if err != nil { stmt.Close() tx.Rollback() return fmt.Errorf("error flushing bulk data: %w", err) } err = stmt.Close() if err != nil { tx.Rollback() return fmt.Errorf("error closing statement: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("error committing transaction: %w", err) } rowsAffected, _ := result.RowsAffected() chunkCount++ totalRowsLoaded += int(rowsAffected) copyDuration := time.Since(copyStartTime) chunkDuration := time.Since(chunkStartTime) rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() log.Infof("Loaded chunk #%d (MSSQL): %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded) } return nil } func generateRandomPolygonWKB() []byte { minX := rand.Float64()*180 - 90 minY := rand.Float64()*180 - 90 size := 0.01 coords := []geom.Coord{ {minX, minY}, {minX + size, minY}, {minX + size, minY + size}, {minX, minY + size}, {minX, minY}, } polygon := geom.NewPolygon(geom.XY).MustSetCoords([][]geom.Coord{coords}) polygonWkb, _ := wkb.Marshal(polygon, wkb.NDR) return polygonWkb } func getMaxGDBArchiveOid(ctx context.Context, db *sql.DB) (int, error) { var maxOid sql.NullInt64 query := fmt.Sprintf(` SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0) FROM [%s].[%s] `, schema, table) err := db.QueryRowContext(ctx, query).Scan(&maxOid) if err != nil && err != sql.ErrNoRows { return 0, err } if !maxOid.Valid { return 0, nil } return int(maxOid.Int64), nil } func generateManzanaRows( ctx context.Context, startOid int, totalRows int, chunkSize int, out chan<- []UnknownRowValues, ) { defer close(out) rowsGenerated := 0 currentChunk := make([]UnknownRowValues, 0, chunkSize) for i := range totalRows { gdbArchiveOid := startOid + i + 1 row := generateManzanaRow(gdbArchiveOid) currentChunk = append(currentChunk, row) rowsGenerated++ if len(currentChunk) == chunkSize { select { case out <- currentChunk: log.Debugf("Sent chunk with %d rows", len(currentChunk)) case <-ctx.Done(): log.Info("Context cancelled, stopping row generation") return } currentChunk = make([]UnknownRowValues, 0, chunkSize) } if rowsGenerated%100_000 == 0 { logManzanaSampleRow(rowsGenerated, row) } } if len(currentChunk) > 0 { select { case out <- currentChunk: log.Debugf("Sent final chunk with %d rows", len(currentChunk)) case <-ctx.Done(): log.Info("Context cancelled, stopping row generation") } } log.Infof("Finished generating %d rows", rowsGenerated) } func generateManzanaRow(gdbArchiveOid int) UnknownRowValues { dateLowerLimit, _ := time.Parse(time.RFC3339, "2020-12-31T23:59:59Z") dateUpperLimit, _ := time.Parse(time.RFC3339, "2025-12-31T23:59:59Z") rowID := gdbArchiveOid distrito := fmt.Sprintf("D%d", rand.Intn(100)) nombre := generateRandomString(15) codigo := generateRandomString(15) cantidadTotal := rand.Intn(1000) ocupacionResidencial := rand.Intn(1000) ocupacionNegocio := rand.Intn(1000) ocupacionDepartamento := rand.Intn(1000) indicador := rand.Intn(10000) fechaAlta := generateRandomTimestamp(dateLowerLimit, dateUpperLimit) fechaAct := generateRandomTimestamp(dateLowerLimit, dateUpperLimit) shapeWKB := generateRandomPolygonWKB() geoData := []byte{} id := uuid.New() globalID := id[:] gdbFromDate := fechaAct gdbToDate, _ := time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") objectID := gdbArchiveOid shapeMssql, err := mssqlclrgeo.WkbToUdtGeo(shapeWKB, false) if err != nil { log.Errorf("Error convirtiendo WKB a formato MSSQL: %v", err) shapeMssql = []byte{} } return UnknownRowValues{ gdbArchiveOid, rowID, distrito, nombre, codigo, cantidadTotal, ocupacionResidencial, ocupacionNegocio, ocupacionDepartamento, indicador, fechaAlta, fechaAct, shapeMssql, geoData, globalID, gdbFromDate, gdbToDate, objectID, } } func generateRandomTimestamp(min, max time.Time) time.Time { minUnix := min.Unix() maxUnix := max.Unix() delta := maxUnix - minUnix secAleatorios := rand.Int63n(delta) return time.Unix(minUnix+secAleatorios, 0) } func generateRandomString(maxLength int) string { const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" length := min(rand.Intn(maxLength)+1, maxLength) b := make([]byte, length) for i := range b { b[i] = charset[rand.Intn(len(charset))] } return string(b) }