diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index d2f0a88..48d3c32 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -23,7 +23,7 @@ var migrationJobs []MigrationJob = []MigrationJob{ const ( NumExtractors int = 1 NumLoaders int = 4 - ChunkSize int = 100000 + ChunkSize int = 50000 QueueSize int = 10 ) diff --git a/go.mod b/go.mod index b6526e4..e470a39 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,18 @@ module git.ksdemosapps.com/kylesoda/go-migrate go 1.25.7 require ( + github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.9.1 github.com/joho/godotenv v1.5.1 github.com/microsoft/go-mssqldb v1.9.8 github.com/sirupsen/logrus v1.9.4 + github.com/twpayne/go-geom v1.6.1 ) require ( github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect diff --git a/go.sum b/go.sum index 14a1c24..2bf97bb 100644 --- a/go.sum +++ b/go.sum @@ -10,9 +10,17 @@ github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0 h1:nCYfg github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0/go.mod h1:ucUjca2JtSZboY8IoUqyQyuuXvwbMBVwFOm0vdQPNhA= github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgvJqCH0sFfrBUTnUJSBrBf7++ypk+twtRs= github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= +github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= +github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4 h1:4vH4+3zfwZTqoJEFw7DsTaH1V8jgVwnyeDvNi2TxzAc= +github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4/go.mod h1:jlB0I5BIfcJBGdV6rRGPthSBfeY86RGkSAwcsldbHJc= github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= @@ -21,6 +29,8 @@ github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -48,6 +58,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/twpayne/go-geom v1.6.1 h1:iLE+Opv0Ihm/ABIcvQFGIiFBXd76oBIar9drAwHFhR4= +github.com/twpayne/go-geom v1.6.1/go.mod h1:Kr+Nly6BswFsKM5sd31YaoWS5PeDDH2NftJTK7Gd028= golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= diff --git a/scripts/mssql-copy-in/main.go b/scripts/mssql-copy-in/main.go index 847cb4e..57ab4be 100644 --- a/scripts/mssql-copy-in/main.go +++ b/scripts/mssql-copy-in/main.go @@ -1,101 +1,312 @@ -//go:build go1.10 -// +build go1.10 - package main import ( + "context" "database/sql" - "log" - "strings" - "unicode/utf8" + "fmt" + "math/rand" + "sync" + "time" + "github.com/gaspardle/go-mssqlclrgeo" + "github.com/google/uuid" mssql "github.com/microsoft/go-mssqldb" + log "github.com/sirupsen/logrus" + "github.com/twpayne/go-geom" + "github.com/twpayne/go-geom/encoding/wkb" ) const ( - createTestTable = `CREATE TABLE test_table( - id int IDENTITY(1,1) NOT NULL, - test_nvarchar nvarchar(50) NULL, - test_varchar varchar(50) NULL, - test_float float NULL, - test_datetime2_3 datetime2(3) NULL, - test_bitn bit NULL, - test_bigint bigint NOT NULL, - test_geom geometry NULL, - CONSTRAINT PK_table_test_id PRIMARY KEY CLUSTERED - ( - id ASC - ) ON [PRIMARY]);` - dropTestTable = "IF OBJECT_ID('test_table', 'U') IS NOT NULL DROP TABLE test_table;" + totalRows int = 5_000_000 + chunkSize int = 50_000 + schema string = "Cartografia" + table string = "MANZANA" + queueSize int = 4 ) -// This example shows how to perform bulk imports func main() { - db, err := sql.Open("sqlserver", "") - if err != nil { - log.Fatal("Open connection failed:", err.Error()) + log.SetFormatter(&log.TextFormatter{ + FullTimestamp: true, + TimestampFormat: time.StampMilli, + DisableSorting: false, + PadLevelText: true, + }) + log.SetLevel(log.DebugLevel) + + db, connError := connectToSqlServer() + if connError != nil { + log.Fatal("Connection error: ", connError) } defer db.Close() - txn, err := db.Begin() + ctx := context.Background() + + maxOid, err := getMaxGDBArchiveOid(ctx, db) if err != nil { - log.Fatal(err) + log.Fatal("Error getting max GDB_ARCHIVE_OID: ", err) } - // Create table - _, err = db.Exec(createTestTable) - if err != nil { - log.Fatal(err) - } - defer db.Exec(dropTestTable) + log.Infof("Starting data generation from GDB_ARCHIVE_OID: %d", maxOid+1) - // mssqldb.CopyIn creates string to be consumed by Prepare - stmt, err := txn.Prepare(mssql.CopyIn("test_table", mssql.BulkOptions{}, "test_varchar", "test_nvarchar", "test_float", "test_bigint")) - if err != nil { - log.Fatal(err.Error()) + rowsChan := make(chan []UnknownRowValues, queueSize) + + var wgRowGenerator sync.WaitGroup + + wgRowGenerator.Go(func() { + generateManzanaRows(ctx, maxOid, totalRows, chunkSize, rowsChan) + }) + + columns := []string{ + "GDB_ARCHIVE_OID", + "ID_MANZANA", + "ID_DISTRITO", + "NOMBRE", + "CODIGO", + "CANTIDAD_TOTAL", + "OCUPACION_RESIDENCIAL", + "OCUPACION_NEGOCIO", + "OCUPACION_DEPARTAMENTO", + "INDICADOR", + "FECHA_ALTA", + "FECHA_ACT", + "Shape", + "GDB_GEOMATTR_DATA", + "GlobalID", + "GDB_FROM_DATE", + "GDB_TO_DATE", + "OBJECTID", } - for i := 0; i < 10; i++ { - _, err = stmt.Exec(generateString(0, 30), generateStringUnicode(0, 30), i, i) + job := MigrationJob{ + Schema: schema, + Table: table, + } + + if err := loadRowsMssql(ctx, job, columns, db, rowsChan); err != nil { + log.Fatal("Error loading rows: ", err) + } + + log.Info("Data generation and loading completed successfully") + wgRowGenerator.Wait() +} + +func loadRowsMssql(ctx context.Context, job MigrationJob, colNames []string, db *sql.DB, in <-chan []UnknownRowValues) error { + chunkCount := 0 + totalRowsLoaded := 0 + + for rows := range in { + chunkStartTime := time.Now() + + tx, err := db.BeginTx(ctx, nil) if err != nil { - log.Fatal(err.Error()) + return fmt.Errorf("error starting transaction: %w", err) + } + + fullTableName := fmt.Sprintf("[%s].[%s]", job.Schema, job.Table) + + stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, colNames...)) + if err != nil { + tx.Rollback() + return fmt.Errorf("error preparing bulk copy statement: %w", err) + } + + copyStartTime := time.Now() + + for _, row := range rows { + _, err = stmt.ExecContext(ctx, row...) + if err != nil { + stmt.Close() + tx.Rollback() + return fmt.Errorf("error executing row insert: %w", err) + } + } + + result, err := stmt.ExecContext(ctx) + if err != nil { + stmt.Close() + tx.Rollback() + return fmt.Errorf("error flushing bulk data: %w", err) + } + + err = stmt.Close() + if err != nil { + tx.Rollback() + return fmt.Errorf("error closing statement: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("error committing transaction: %w", err) + } + + rowsAffected, _ := result.RowsAffected() + chunkCount++ + totalRowsLoaded += int(rowsAffected) + + copyDuration := time.Since(copyStartTime) + chunkDuration := time.Since(chunkStartTime) + rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() + + log.Infof("Loaded chunk #%d (MSSQL): %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded) + } + + return nil +} + +func generateRandomPolygonWKB() []byte { + minX := rand.Float64()*180 - 90 + minY := rand.Float64()*180 - 90 + + size := 0.01 + + coords := []geom.Coord{ + {minX, minY}, + {minX + size, minY}, + {minX + size, minY + size}, + {minX, minY + size}, + {minX, minY}, + } + + polygon := geom.NewPolygon(geom.XY).MustSetCoords([][]geom.Coord{coords}) + + polygonWkb, _ := wkb.Marshal(polygon, wkb.NDR) + + return polygonWkb +} + +func getMaxGDBArchiveOid(ctx context.Context, db *sql.DB) (int, error) { + var maxOid sql.NullInt64 + + query := fmt.Sprintf(` + SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0) + FROM [%s].[%s] + `, schema, table) + + err := db.QueryRowContext(ctx, query).Scan(&maxOid) + if err != nil && err != sql.ErrNoRows { + return 0, err + } + + if !maxOid.Valid { + return 0, nil + } + + return int(maxOid.Int64), nil +} + +func generateManzanaRows( + ctx context.Context, + startOid int, + totalRows int, + chunkSize int, + out chan<- []UnknownRowValues, +) { + defer close(out) + + rowsGenerated := 0 + currentChunk := make([]UnknownRowValues, 0, chunkSize) + + for i := range totalRows { + gdbArchiveOid := startOid + i + 1 + row := generateManzanaRow(gdbArchiveOid) + currentChunk = append(currentChunk, row) + rowsGenerated++ + + if len(currentChunk) == chunkSize { + select { + case out <- currentChunk: + log.Debugf("Sent chunk with %d rows", len(currentChunk)) + case <-ctx.Done(): + log.Info("Context cancelled, stopping row generation") + return + } + currentChunk = make([]UnknownRowValues, 0, chunkSize) + } + + if rowsGenerated%100_000 == 0 { + logManzanaSampleRow(rowsGenerated, row) } } - result, err := stmt.Exec() - if err != nil { - log.Fatal(err) + if len(currentChunk) > 0 { + select { + case out <- currentChunk: + log.Debugf("Sent final chunk with %d rows", len(currentChunk)) + case <-ctx.Done(): + log.Info("Context cancelled, stopping row generation") + } } - err = stmt.Close() - if err != nil { - log.Fatal(err) - } - - err = txn.Commit() - if err != nil { - log.Fatal(err) - } - rowCount, _ := result.RowsAffected() - log.Printf("%d row copied\n", rowCount) - log.Printf("bye\n") + log.Infof("Finished generating %d rows", rowsGenerated) } -func generateString(x int, n int) string { - letters := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - b := make([]byte, n) +func generateManzanaRow(gdbArchiveOid int) UnknownRowValues { + dateLowerLimit, _ := time.Parse(time.RFC3339, "2020-12-31T23:59:59Z") + dateUpperLimit, _ := time.Parse(time.RFC3339, "2025-12-31T23:59:59Z") + + rowID := gdbArchiveOid + distrito := fmt.Sprintf("D%d", rand.Intn(100)) + nombre := generateRandomString(15) + codigo := generateRandomString(15) + cantidadTotal := rand.Intn(1000) + ocupacionResidencial := rand.Intn(1000) + ocupacionNegocio := rand.Intn(1000) + ocupacionDepartamento := rand.Intn(1000) + indicador := rand.Intn(10000) + fechaAlta := generateRandomTimestamp(dateLowerLimit, dateUpperLimit) + fechaAct := generateRandomTimestamp(dateLowerLimit, dateUpperLimit) + shapeWKB := generateRandomPolygonWKB() + geoData := []byte{} + id := uuid.New() + globalID := id[:] + gdbFromDate := fechaAct + gdbToDate, _ := time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") + objectID := gdbArchiveOid + + shapeMssql, err := mssqlclrgeo.WkbToUdtGeo(shapeWKB, false) + if err != nil { + log.Errorf("Error convirtiendo WKB a formato MSSQL: %v", err) + shapeMssql = []byte{} + } + + return UnknownRowValues{ + gdbArchiveOid, + rowID, + distrito, + nombre, + codigo, + cantidadTotal, + ocupacionResidencial, + ocupacionNegocio, + ocupacionDepartamento, + indicador, + fechaAlta, + fechaAct, + shapeMssql, + geoData, + globalID, + gdbFromDate, + gdbToDate, + objectID, + } +} + +func generateRandomTimestamp(min, max time.Time) time.Time { + minUnix := min.Unix() + maxUnix := max.Unix() + + delta := maxUnix - minUnix + secAleatorios := rand.Int63n(delta) + + return time.Unix(minUnix+secAleatorios, 0) +} + +func generateRandomString(maxLength int) string { + const charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + length := min(rand.Intn(maxLength)+1, maxLength) + + b := make([]byte, length) for i := range b { - b[i] = letters[(x+i)%len(letters)] + b[i] = charset[rand.Intn(len(charset))] } return string(b) } -func generateStringUnicode(x int, n int) string { - letters := []byte("ab©💾é?ghïjklmnopqЯ☀tuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - b := &strings.Builder{} - for i := 0; i < n; i++ { - r, sz := utf8.DecodeRune(letters[x%len(letters):]) - x += sz - b.WriteRune(r) - } - return b.String() -} diff --git a/scripts/mssql-copy-in/types.go b/scripts/mssql-copy-in/types.go new file mode 100644 index 0000000..31d6ff2 --- /dev/null +++ b/scripts/mssql-copy-in/types.go @@ -0,0 +1,52 @@ +package main + +type ColumnType struct { + name string + + hasMaxLength bool + hasPrecisionScale bool + + userType string + systemType string + unifiedType string + nullable bool + maxLength int64 + precision int64 + scale int64 +} + +func (c *ColumnType) Name() string { + return c.name +} + +func (c *ColumnType) UserType() string { + return c.userType +} + +func (c *ColumnType) SystemType() string { + return c.systemType +} + +func (c *ColumnType) Length() (length int64, ok bool) { + return c.maxLength, c.hasMaxLength +} + +func (c *ColumnType) DecimalSize() (precision, scale int64, ok bool) { + return c.precision, c.scale, c.hasPrecisionScale +} + +func (c *ColumnType) Nullable() bool { + return c.nullable +} + +func (c *ColumnType) Type() string { + return c.unifiedType +} + +type MigrationJob struct { + Schema string + Table string + PrimaryKey string +} + +type UnknownRowValues = []any diff --git a/scripts/mssql-copy-in/utils.go b/scripts/mssql-copy-in/utils.go new file mode 100644 index 0000000..7ce5aef --- /dev/null +++ b/scripts/mssql-copy-in/utils.go @@ -0,0 +1,81 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "time" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + log "github.com/sirupsen/logrus" +) + +func connectToSqlServer() (*sql.DB, error) { + db, err := sql.Open("sqlserver", config.App.SourceDbUrl) + if err != nil { + return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + if err := db.PingContext(ctx); err != nil { + return nil, fmt.Errorf("Unable to ping sqlserver: %w", err) + } + + return db, nil +} + +func Map[T any, V any](input []T, mapper func(T) V) []V { + result := make([]V, len(input)) + + for i, v := range input { + result[i] = mapper(v) + } + + return result +} + +func logManzanaSampleRow(id int, rowValues UnknownRowValues) { + log.Infof(` +Sample row #%d: +GDB_ARCHIVE_OID (%T): %v +ID_MANZANA (%T): %v +ID_DISTRITO (%T): %v +NOMBRE (%T): %v +CODIGO (%T): %v +CANTIDAD_TOTAL (%T): %v +OCUPACION_RESIDENCIAL (%T): %v +OCUPACION_NEGOCIO (%T): %v +OCUPACION_DEPARTAMENTO (%T): %v +INDICADOR (%T): %v +FECHA_ALTA (%T): %v +FECHA_ACT (%T): %v +Shape (%T): %v +GDB_GEOMATTR_DATA (%T): %v +GlobalID (%T): %v +GDB_FROM_DATE (%T): %v +GDB_TO_DATE (%T): %v +OBJECTID (%T): %v +`, + id, + rowValues[0], rowValues[0], + rowValues[1], rowValues[1], + rowValues[2], rowValues[2], + rowValues[3], rowValues[3], + rowValues[4], rowValues[4], + rowValues[5], rowValues[5], + rowValues[6], rowValues[6], + rowValues[7], rowValues[7], + rowValues[8], rowValues[8], + rowValues[9], rowValues[9], + rowValues[10], rowValues[10], + rowValues[11], rowValues[11], + rowValues[12], rowValues[12], + rowValues[13], rowValues[13], + rowValues[14], rowValues[14], + rowValues[15], rowValues[15], + rowValues[16], rowValues[16], + rowValues[17], rowValues[17], + ) +}