feat: add MANZANA migration job and update related processing logic

This commit is contained in:
2026-04-08 10:16:27 -05:00
parent 3765e8adb3
commit 75b04d4b2e
5 changed files with 19 additions and 18 deletions

View File

@@ -13,6 +13,11 @@ type MigrationJob struct {
} }
var migrationJobs []MigrationJob = []MigrationJob{ var migrationJobs []MigrationJob = []MigrationJob{
{
Schema: "Cartografia",
Table: "MANZANA",
PrimaryKey: "GDB_ARCHIVE_OID",
},
{ {
Schema: "Red", Schema: "Red",
Table: "PUERTO", Table: "PUERTO",

View File

@@ -79,17 +79,17 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
}() }()
var wgPostgresLoaders sync.WaitGroup var wgPostgresLoaders sync.WaitGroup
// postgresLoaderCtx := context.Background() postgresLoaderCtx := context.Background()
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders) log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
loaderStartTime := time.Now() loaderStartTime := time.Now()
for range NumLoaders { for range NumLoaders {
wgPostgresLoaders.Go(func() { wgPostgresLoaders.Go(func() {
// if err := loadRowsPostgres(postgresLoaderCtx, job, sourceColTypes, targetDb, chRowsTransform); err != nil { if err := loadRowsPostgres(postgresLoaderCtx, job, targetColTypes, targetDb, chRowsTransform); err != nil {
// log.Error("Unexpected error loading data into postgres: ", err) log.Error("Unexpected error loading data into postgres: ", err)
// } }
fakeLoader(job, sourceColTypes, chRowsTransform) // fakeLoader(job, sourceColTypes, chRowsTransform)
}) })
} }

View File

@@ -14,8 +14,6 @@ import (
const ( const (
totalRows int = 1_000_000 totalRows int = 1_000_000
chunkSize int = 50_000 chunkSize int = 50_000
schema string = "Red"
table string = "PUERTO"
queueSize int = 4 queueSize int = 4
) )
@@ -39,7 +37,7 @@ func main() {
var wgSeed sync.WaitGroup var wgSeed sync.WaitGroup
wgSeed.Go(func() { wgSeed.Go(func() {
seedPuertos(ctx, db) seedManzanas(ctx, db)
}) })
wgSeed.Wait() wgSeed.Wait()

View File

@@ -55,12 +55,10 @@ func generatePuertoRows(
) { ) {
defer close(out) defer close(out)
rand.Seed(time.Now().UnixNano())
rowsGenerated := 0 rowsGenerated := 0
currentChunk := make([]UnknownRowValues, 0, chunkSize) currentChunk := make([]UnknownRowValues, 0, chunkSize)
for i := 0; i < totalRows; i++ { for range totalRows {
row := generatePuertoRow() row := generatePuertoRow()
currentChunk = append(currentChunk, row) currentChunk = append(currentChunk, row)
rowsGenerated++ rowsGenerated++

View File

@@ -13,13 +13,18 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var manzanaJob = MigrationJob{
Schema: "Cartografia",
Table: "MANZANA",
}
func getMaxGDBArchiveOid(ctx context.Context, db *sql.DB) (int, error) { func getMaxGDBArchiveOid(ctx context.Context, db *sql.DB) (int, error) {
var maxOid sql.NullInt64 var maxOid sql.NullInt64
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0) SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0)
FROM [%s].[%s] FROM [%s].[%s]
`, schema, table) `, manzanaJob.Schema, manzanaJob.Table)
err := db.QueryRowContext(ctx, query).Scan(&maxOid) err := db.QueryRowContext(ctx, query).Scan(&maxOid)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
@@ -211,12 +216,7 @@ func seedManzanas(ctx context.Context, db *sql.DB) error {
"OBJECTID", "OBJECTID",
} }
job := MigrationJob{ if err := loadRowsMssql(ctx, manzanaJob, columns, db, rowsChan); err != nil {
Schema: schema,
Table: table,
}
if err := loadRowsMssql(ctx, job, columns, db, rowsChan); err != nil {
return fmt.Errorf("Error loading rows (MANZANA): %w", err) return fmt.Errorf("Error loading rows (MANZANA): %w", err)
} }