package main import ( "context" "database/sql" "fmt" "sync" "time" mssql "github.com/microsoft/go-mssqldb" log "github.com/sirupsen/logrus" ) const ( totalRows int = 1_000_000 chunkSize int = 50_000 schema string = "Red" table string = "PUERTO" queueSize int = 4 ) func main() { log.SetFormatter(&log.TextFormatter{ FullTimestamp: true, TimestampFormat: time.StampMilli, DisableSorting: false, PadLevelText: true, }) log.SetLevel(log.DebugLevel) db, connError := connectToSqlServer() if connError != nil { log.Fatal("Connection error: ", connError) } defer db.Close() ctx := context.Background() var wgSeed sync.WaitGroup wgSeed.Go(func() { seedPuertos(ctx, db) }) wgSeed.Wait() } func loadRowsMssql(ctx context.Context, job MigrationJob, colNames []string, db *sql.DB, in <-chan []UnknownRowValues) error { chunkCount := 0 totalRowsLoaded := 0 for rows := range in { chunkStartTime := time.Now() tx, err := db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("error starting transaction: %w", err) } fullTableName := fmt.Sprintf("[%s].[%s]", job.Schema, job.Table) stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, colNames...)) if err != nil { tx.Rollback() return fmt.Errorf("error preparing bulk copy statement: %w", err) } copyStartTime := time.Now() for _, row := range rows { _, err = stmt.ExecContext(ctx, row...) if err != nil { stmt.Close() tx.Rollback() return fmt.Errorf("error executing row insert: %w", err) } } result, err := stmt.ExecContext(ctx) if err != nil { stmt.Close() tx.Rollback() return fmt.Errorf("error flushing bulk data: %w", err) } err = stmt.Close() if err != nil { tx.Rollback() return fmt.Errorf("error closing statement: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("error committing transaction: %w", err) } rowsAffected, _ := result.RowsAffected() chunkCount++ totalRowsLoaded += int(rowsAffected) copyDuration := time.Since(copyStartTime) chunkDuration := time.Since(chunkStartTime) rowsPerSec := float64(len(rows)) / chunkDuration.Seconds() log.Infof("Loaded chunk #%d (MSSQL): %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded) } return nil }