feat: implement MSSQL data extraction and transformation process
This commit is contained in:
52
cmd/go_migrate/extractor.go
Normal file
52
cmd/go_migrate/extractor.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
_ "github.com/microsoft/go-mssqldb"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type UnknownRowValues []any
|
||||
|
||||
func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *sql.DB, out chan<- []UnknownRowValues) error {
|
||||
query := buildExtractQueryMssql(job, columns)
|
||||
log.Debug("Query used to extract data from mssql: ", query)
|
||||
|
||||
rows, err := db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
rowsChunk := make([]UnknownRowValues, 0, chunkSize)
|
||||
|
||||
for rows.Next() {
|
||||
values := make([]any, len(columns))
|
||||
scanArgs := make([]any, len(columns))
|
||||
|
||||
for i := range values {
|
||||
scanArgs[i] = &values[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsChunk = append(rowsChunk, values)
|
||||
|
||||
if len(rowsChunk) >= chunkSize {
|
||||
out <- rowsChunk
|
||||
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
|
||||
log.Infof("Chunk send... %+v", job)
|
||||
}
|
||||
}
|
||||
|
||||
if len(rowsChunk) > 0 {
|
||||
out <- rowsChunk
|
||||
log.Infof("Chunk send... %+v", job)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user