From 4434054b21afe147116483c9e51c6f7312013b0f Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Wed, 8 Apr 2026 09:57:11 -0500 Subject: [PATCH] feat: implement data generation and loading for MANZANA with improved structure and logging --- cmd/go_migrate/inspect-columns.go | 4 + cmd/go_migrate/log.go | 2 +- cmd/go_migrate/main.go | 6 +- cmd/go_migrate/process.go | 9 +- scripts/mssql-copy-in/main.go | 212 +----------------------- scripts/mssql-copy-in/seed-manzana.go | 227 ++++++++++++++++++++++++++ scripts/mssql-copy-in/utils.go | 86 +++++----- 7 files changed, 287 insertions(+), 259 deletions(-) create mode 100644 scripts/mssql-copy-in/seed-manzana.go diff --git a/cmd/go_migrate/inspect-columns.go b/cmd/go_migrate/inspect-columns.go index b97043c..4c07f46 100644 --- a/cmd/go_migrate/inspect-columns.go +++ b/cmd/go_migrate/inspect-columns.go @@ -242,6 +242,10 @@ ORDER BY c.column_id; return nil, fmt.Errorf("Error scanning column type results: %W", err) } + if strings.HasPrefix(column.name, "graph_id") && column.systemType == "bigint" { + continue + } + colTypes = append(colTypes, MapMssqlColumn(column)) } diff --git a/cmd/go_migrate/log.go b/cmd/go_migrate/log.go index a7bad26..a7afee3 100644 --- a/cmd/go_migrate/log.go +++ b/cmd/go_migrate/log.go @@ -13,5 +13,5 @@ func configureLog() { DisableSorting: false, PadLevelText: true, }) - log.SetLevel(log.InfoLevel) + log.SetLevel(log.DebugLevel) } diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index efba4cb..73228cf 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -14,9 +14,9 @@ type MigrationJob struct { var migrationJobs []MigrationJob = []MigrationJob{ { - Schema: "Cartografia", - Table: "MANZANA", - PrimaryKey: "GDB_ARCHIVE_OID", + Schema: "Red", + Table: "PUERTO", + PrimaryKey: "ID_PUERTO", }, } diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 30bd8f0..901ab01 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -79,16 +79,17 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration }() var wgPostgresLoaders sync.WaitGroup - postgresLoaderCtx := context.Background() + // postgresLoaderCtx := context.Background() log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) loaderStartTime := time.Now() for range NumLoaders { wgPostgresLoaders.Go(func() { - if err := loadRowsPostgres(postgresLoaderCtx, job, sourceColTypes, targetDb, chRowsTransform); err != nil { - log.Error("Unexpected error loading data into postgres: ", err) - } + // if err := loadRowsPostgres(postgresLoaderCtx, job, sourceColTypes, targetDb, chRowsTransform); err != nil { + // log.Error("Unexpected error loading data into postgres: ", err) + // } + fakeLoader(job, sourceColTypes, chRowsTransform) }) } diff --git a/scripts/mssql-copy-in/main.go b/scripts/mssql-copy-in/main.go index 5ddcd2a..1671861 100644 --- a/scripts/mssql-copy-in/main.go +++ b/scripts/mssql-copy-in/main.go @@ -4,16 +4,11 @@ import ( "context" "database/sql" "fmt" - "math/rand" "sync" "time" - "github.com/gaspardle/go-mssqlclrgeo" - "github.com/google/uuid" mssql "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" - "github.com/twpayne/go-geom" - "github.com/twpayne/go-geom/encoding/wkb" ) const ( @@ -41,53 +36,13 @@ func main() { ctx := context.Background() - maxOid, err := getMaxGDBArchiveOid(ctx, db) - if err != nil { - log.Fatal("Error getting max GDB_ARCHIVE_OID: ", err) - } + var wgSeed sync.WaitGroup - log.Infof("Starting data generation from GDB_ARCHIVE_OID: %d", maxOid+1) - - rowsChan := make(chan []UnknownRowValues, queueSize) - - var wgRowGenerator sync.WaitGroup - - wgRowGenerator.Go(func() { - generateManzanaRows(ctx, maxOid, totalRows, chunkSize, rowsChan) + wgSeed.Go(func() { + seedManzanas(ctx, db) }) - columns := []string{ - "GDB_ARCHIVE_OID", - "ID_MANZANA", - "ID_DISTRITO", - "NOMBRE", - "CODIGO", - "CANTIDAD_TOTAL", - "OCUPACION_RESIDENCIAL", - "OCUPACION_NEGOCIO", - "OCUPACION_DEPARTAMENTO", - "INDICADOR", - "FECHA_ALTA", - "FECHA_ACT", - "Shape", - "GDB_GEOMATTR_DATA", - "GlobalID", - "GDB_FROM_DATE", - "GDB_TO_DATE", - "OBJECTID", - } - - job := MigrationJob{ - Schema: schema, - Table: table, - } - - if err := loadRowsMssql(ctx, job, columns, db, rowsChan); err != nil { - log.Fatal("Error loading rows: ", err) - } - - log.Info("Data generation and loading completed successfully") - wgRowGenerator.Wait() + wgSeed.Wait() } func loadRowsMssql(ctx context.Context, job MigrationJob, colNames []string, db *sql.DB, in <-chan []UnknownRowValues) error { @@ -151,162 +106,3 @@ func loadRowsMssql(ctx context.Context, job MigrationJob, colNames []string, db return nil } - -func generateRandomPolygonWKB() []byte { - minX := rand.Float64()*180 - 90 - minY := rand.Float64()*180 - 90 - - size := 0.01 - - coords := []geom.Coord{ - {minX, minY}, - {minX + size, minY}, - {minX + size, minY + size}, - {minX, minY + size}, - {minX, minY}, - } - - polygon := geom.NewPolygon(geom.XY).MustSetCoords([][]geom.Coord{coords}) - - polygonWkb, _ := wkb.Marshal(polygon, wkb.NDR) - - return polygonWkb -} - -func getMaxGDBArchiveOid(ctx context.Context, db *sql.DB) (int, error) { - var maxOid sql.NullInt64 - - query := fmt.Sprintf(` - SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0) - FROM [%s].[%s] - `, schema, table) - - err := db.QueryRowContext(ctx, query).Scan(&maxOid) - if err != nil && err != sql.ErrNoRows { - return 0, err - } - - if !maxOid.Valid { - return 0, nil - } - - return int(maxOid.Int64), nil -} - -func generateManzanaRows( - ctx context.Context, - startOid int, - totalRows int, - chunkSize int, - out chan<- []UnknownRowValues, -) { - defer close(out) - - rowsGenerated := 0 - currentChunk := make([]UnknownRowValues, 0, chunkSize) - - for i := range totalRows { - gdbArchiveOid := startOid + i + 1 - row := generateManzanaRow(gdbArchiveOid) - currentChunk = append(currentChunk, row) - rowsGenerated++ - - if len(currentChunk) == chunkSize { - select { - case out <- currentChunk: - log.Debugf("Sent chunk with %d rows", len(currentChunk)) - case <-ctx.Done(): - log.Info("Context cancelled, stopping row generation") - return - } - currentChunk = make([]UnknownRowValues, 0, chunkSize) - } - - if rowsGenerated%100_000 == 0 { - logManzanaSampleRow(rowsGenerated, row) - } - } - - if len(currentChunk) > 0 { - select { - case out <- currentChunk: - log.Debugf("Sent final chunk with %d rows", len(currentChunk)) - case <-ctx.Done(): - log.Info("Context cancelled, stopping row generation") - } - } - - log.Infof("Finished generating %d rows", rowsGenerated) -} - -func generateManzanaRow(gdbArchiveOid int) UnknownRowValues { - dateLowerLimit, _ := time.Parse(time.RFC3339, "2020-12-31T23:59:59Z") - dateUpperLimit, _ := time.Parse(time.RFC3339, "2025-12-31T23:59:59Z") - - rowID := gdbArchiveOid - distrito := fmt.Sprintf("D%d", rand.Intn(100)) - nombre := generateRandomString(15) - codigo := generateRandomString(15) - cantidadTotal := rand.Intn(1000) - ocupacionResidencial := rand.Intn(1000) - ocupacionNegocio := rand.Intn(1000) - ocupacionDepartamento := rand.Intn(1000) - indicador := rand.Intn(10000) - fechaAlta := generateRandomTimestamp(dateLowerLimit, dateUpperLimit) - fechaAct := generateRandomTimestamp(dateLowerLimit, dateUpperLimit) - shapeWKB := generateRandomPolygonWKB() - geoData := []byte{} - id := uuid.New() - globalID := id[:] - gdbFromDate := fechaAct - gdbToDate, _ := time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") - objectID := gdbArchiveOid - - shapeMssql, err := mssqlclrgeo.WkbToUdtGeo(shapeWKB, false) - if err != nil { - log.Errorf("Error convirtiendo WKB a formato MSSQL: %v", err) - shapeMssql = []byte{} - } - - return UnknownRowValues{ - gdbArchiveOid, - rowID, - distrito, - nombre, - codigo, - cantidadTotal, - ocupacionResidencial, - ocupacionNegocio, - ocupacionDepartamento, - indicador, - fechaAlta, - fechaAct, - shapeMssql, - geoData, - globalID, - gdbFromDate, - gdbToDate, - objectID, - } -} - -func generateRandomTimestamp(min, max time.Time) time.Time { - minUnix := min.Unix() - maxUnix := max.Unix() - - delta := maxUnix - minUnix - secAleatorios := rand.Int63n(delta) - - return time.Unix(minUnix+secAleatorios, 0) -} - -func generateRandomString(maxLength int) string { - const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - length := min(rand.Intn(maxLength)+1, maxLength) - - b := make([]byte, length) - for i := range b { - b[i] = charset[rand.Intn(len(charset))] - } - return string(b) -} diff --git a/scripts/mssql-copy-in/seed-manzana.go b/scripts/mssql-copy-in/seed-manzana.go new file mode 100644 index 0000000..1c494dd --- /dev/null +++ b/scripts/mssql-copy-in/seed-manzana.go @@ -0,0 +1,227 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/gaspardle/go-mssqlclrgeo" + "github.com/google/uuid" + log "github.com/sirupsen/logrus" +) + +func getMaxGDBArchiveOid(ctx context.Context, db *sql.DB) (int, error) { + var maxOid sql.NullInt64 + + query := fmt.Sprintf(` + SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0) + FROM [%s].[%s] + `, schema, table) + + err := db.QueryRowContext(ctx, query).Scan(&maxOid) + if err != nil && err != sql.ErrNoRows { + return 0, err + } + + if !maxOid.Valid { + return 0, nil + } + + return int(maxOid.Int64), nil +} + +func generateManzanaRows( + ctx context.Context, + startOid int, + totalRows int, + chunkSize int, + out chan<- []UnknownRowValues, +) { + defer close(out) + + rowsGenerated := 0 + currentChunk := make([]UnknownRowValues, 0, chunkSize) + + for i := range totalRows { + gdbArchiveOid := startOid + i + 1 + row := generateManzanaRow(gdbArchiveOid) + currentChunk = append(currentChunk, row) + rowsGenerated++ + + if len(currentChunk) == chunkSize { + select { + case out <- currentChunk: + log.Debugf("Sent chunk with %d rows", len(currentChunk)) + case <-ctx.Done(): + log.Info("Context cancelled, stopping row generation") + return + } + currentChunk = make([]UnknownRowValues, 0, chunkSize) + } + + if rowsGenerated%100_000 == 0 { + logManzanaSampleRow(rowsGenerated, row) + } + } + + if len(currentChunk) > 0 { + select { + case out <- currentChunk: + log.Debugf("Sent final chunk with %d rows", len(currentChunk)) + case <-ctx.Done(): + log.Info("Context cancelled, stopping row generation") + } + } + + log.Infof("Finished generating %d rows", rowsGenerated) +} + +func generateManzanaRow(gdbArchiveOid int) UnknownRowValues { + dateLowerLimit, _ := time.Parse(time.RFC3339, "2020-12-31T23:59:59Z") + dateUpperLimit, _ := time.Parse(time.RFC3339, "2025-12-31T23:59:59Z") + + rowID := gdbArchiveOid + distrito := fmt.Sprintf("D%d", rand.Intn(100)) + nombre := generateRandomString(15) + codigo := generateRandomString(15) + cantidadTotal := rand.Intn(1000) + ocupacionResidencial := rand.Intn(1000) + ocupacionNegocio := rand.Intn(1000) + ocupacionDepartamento := rand.Intn(1000) + indicador := rand.Intn(10000) + fechaAlta := generateRandomTimestamp(dateLowerLimit, dateUpperLimit) + fechaAct := generateRandomTimestamp(dateLowerLimit, dateUpperLimit) + shapeWKB := generateRandomPolygonWKB() + geoData := []byte{} + id := uuid.New() + globalID := id[:] + gdbFromDate := fechaAct + gdbToDate, _ := time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") + objectID := gdbArchiveOid + + shapeMssql, err := mssqlclrgeo.WkbToUdtGeo(shapeWKB, false) + if err != nil { + log.Errorf("Error convirtiendo WKB a formato MSSQL: %v", err) + shapeMssql = []byte{} + } + + return UnknownRowValues{ + gdbArchiveOid, + rowID, + distrito, + nombre, + codigo, + cantidadTotal, + ocupacionResidencial, + ocupacionNegocio, + ocupacionDepartamento, + indicador, + fechaAlta, + fechaAct, + shapeMssql, + geoData, + globalID, + gdbFromDate, + gdbToDate, + objectID, + } +} + +func logManzanaSampleRow(id int, rowValues UnknownRowValues) { + log.Infof(` +Sample row #%d: +GDB_ARCHIVE_OID (%T): %v +ID_MANZANA (%T): %v +ID_DISTRITO (%T): %v +NOMBRE (%T): %v +CODIGO (%T): %v +CANTIDAD_TOTAL (%T): %v +OCUPACION_RESIDENCIAL (%T): %v +OCUPACION_NEGOCIO (%T): %v +OCUPACION_DEPARTAMENTO (%T): %v +INDICADOR (%T): %v +FECHA_ALTA (%T): %v +FECHA_ACT (%T): %v +Shape (%T): %v +GDB_GEOMATTR_DATA (%T): %v +GlobalID (%T): %v +GDB_FROM_DATE (%T): %v +GDB_TO_DATE (%T): %v +OBJECTID (%T): %v +`, + id, + rowValues[0], rowValues[0], + rowValues[1], rowValues[1], + rowValues[2], rowValues[2], + rowValues[3], rowValues[3], + rowValues[4], rowValues[4], + rowValues[5], rowValues[5], + rowValues[6], rowValues[6], + rowValues[7], rowValues[7], + rowValues[8], rowValues[8], + rowValues[9], rowValues[9], + rowValues[10], rowValues[10], + rowValues[11], rowValues[11], + rowValues[12], rowValues[12], + rowValues[13], rowValues[13], + rowValues[14], rowValues[14], + rowValues[15], rowValues[15], + rowValues[16], rowValues[16], + rowValues[17], rowValues[17], + ) +} + +func seedManzanas(ctx context.Context, db *sql.DB) error { + maxOid, err := getMaxGDBArchiveOid(ctx, db) + if err != nil { + log.Fatal("Error getting max GDB_ARCHIVE_OID: ", err) + } + + log.Infof("Starting data generation from GDB_ARCHIVE_OID: %d", maxOid+1) + + rowsChan := make(chan []UnknownRowValues, queueSize) + + var wgRowGenerator sync.WaitGroup + + wgRowGenerator.Go(func() { + generateManzanaRows(ctx, maxOid, totalRows, chunkSize, rowsChan) + }) + + columns := []string{ + "GDB_ARCHIVE_OID", + "ID_MANZANA", + "ID_DISTRITO", + "NOMBRE", + "CODIGO", + "CANTIDAD_TOTAL", + "OCUPACION_RESIDENCIAL", + "OCUPACION_NEGOCIO", + "OCUPACION_DEPARTAMENTO", + "INDICADOR", + "FECHA_ALTA", + "FECHA_ACT", + "Shape", + "GDB_GEOMATTR_DATA", + "GlobalID", + "GDB_FROM_DATE", + "GDB_TO_DATE", + "OBJECTID", + } + + job := MigrationJob{ + Schema: schema, + Table: table, + } + + if err := loadRowsMssql(ctx, job, columns, db, rowsChan); err != nil { + return fmt.Errorf("Error loading rows (MANZANA): %w", err) + } + + log.Info("Data generation and loading completed successfully (MANZANA)") + wgRowGenerator.Wait() + + return nil +} diff --git a/scripts/mssql-copy-in/utils.go b/scripts/mssql-copy-in/utils.go index 7ce5aef..52b6844 100644 --- a/scripts/mssql-copy-in/utils.go +++ b/scripts/mssql-copy-in/utils.go @@ -4,10 +4,12 @@ import ( "context" "database/sql" "fmt" + "math/rand" "time" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - log "github.com/sirupsen/logrus" + "github.com/twpayne/go-geom" + "github.com/twpayne/go-geom/encoding/wkb" ) func connectToSqlServer() (*sql.DB, error) { @@ -36,46 +38,44 @@ func Map[T any, V any](input []T, mapper func(T) V) []V { return result } -func logManzanaSampleRow(id int, rowValues UnknownRowValues) { - log.Infof(` -Sample row #%d: -GDB_ARCHIVE_OID (%T): %v -ID_MANZANA (%T): %v -ID_DISTRITO (%T): %v -NOMBRE (%T): %v -CODIGO (%T): %v -CANTIDAD_TOTAL (%T): %v -OCUPACION_RESIDENCIAL (%T): %v -OCUPACION_NEGOCIO (%T): %v -OCUPACION_DEPARTAMENTO (%T): %v -INDICADOR (%T): %v -FECHA_ALTA (%T): %v -FECHA_ACT (%T): %v -Shape (%T): %v -GDB_GEOMATTR_DATA (%T): %v -GlobalID (%T): %v -GDB_FROM_DATE (%T): %v -GDB_TO_DATE (%T): %v -OBJECTID (%T): %v -`, - id, - rowValues[0], rowValues[0], - rowValues[1], rowValues[1], - rowValues[2], rowValues[2], - rowValues[3], rowValues[3], - rowValues[4], rowValues[4], - rowValues[5], rowValues[5], - rowValues[6], rowValues[6], - rowValues[7], rowValues[7], - rowValues[8], rowValues[8], - rowValues[9], rowValues[9], - rowValues[10], rowValues[10], - rowValues[11], rowValues[11], - rowValues[12], rowValues[12], - rowValues[13], rowValues[13], - rowValues[14], rowValues[14], - rowValues[15], rowValues[15], - rowValues[16], rowValues[16], - rowValues[17], rowValues[17], - ) +func generateRandomPolygonWKB() []byte { + minX := rand.Float64()*180 - 90 + minY := rand.Float64()*180 - 90 + + size := 0.01 + + coords := []geom.Coord{ + {minX, minY}, + {minX + size, minY}, + {minX + size, minY + size}, + {minX, minY + size}, + {minX, minY}, + } + + polygon := geom.NewPolygon(geom.XY).MustSetCoords([][]geom.Coord{coords}) + + polygonWkb, _ := wkb.Marshal(polygon, wkb.NDR) + + return polygonWkb +} + +func generateRandomTimestamp(min, max time.Time) time.Time { + minUnix := min.Unix() + maxUnix := max.Unix() + + delta := maxUnix - minUnix + secAleatorios := rand.Int63n(delta) + + return time.Unix(minUnix+secAleatorios, 0) +} + +func generateRandomString(maxLength int) string { + const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + length := min(rand.Intn(maxLength)+1, maxLength) + + b := make([]byte, length) + for i := range b { + b[i] = charset[rand.Intn(len(charset))] + } + return string(b) }