46 lines
1.3 KiB
Go
46 lines
1.3 KiB
Go
package main
|
|
|
|
import (
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) {
|
|
chunkCount := 0
|
|
totalRowsTransformed := 0
|
|
|
|
for rows := range in {
|
|
chunkStartTime := time.Now()
|
|
log.Debugf("Chunk #%d received, transforming %d rows...", chunkCount+1, len(rows))
|
|
|
|
for _, rowValues := range rows {
|
|
for i, col := range columns {
|
|
value := rowValues[i]
|
|
if col.SystemType() == "uniqueidentifier" {
|
|
if b, ok := value.([]byte); ok {
|
|
rowValues[i] = mssqlUuidToBigEndian(b)
|
|
}
|
|
} else if col.SystemType() == "geometry" || col.SystemType() == "geography" {
|
|
if b, ok := value.([]byte); ok {
|
|
rowValues[i] = wkbToEwkbWithSrid(b, 4326)
|
|
}
|
|
} else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" {
|
|
if t, ok := value.(time.Time); ok {
|
|
rowValues[i] = ensureUTC(t)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
chunkCount++
|
|
totalRowsTransformed += len(rows)
|
|
chunkDuration := time.Since(chunkStartTime)
|
|
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
|
|
log.Infof("Transformed chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows",
|
|
chunkCount, len(rows), chunkDuration, rowsPerSec, totalRowsTransformed)
|
|
|
|
out <- rows
|
|
}
|
|
}
|