feat: implement MSSQL row transformation and loading functions
This commit is contained in:
19
cmd/go_migrate/loader.go
Normal file
19
cmd/go_migrate/loader.go
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) {
|
||||||
|
for rows := range in {
|
||||||
|
log.Debugf("Chunk received, loading data into...")
|
||||||
|
|
||||||
|
for i, rowValues := range rows {
|
||||||
|
if i%100 == 0 {
|
||||||
|
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,9 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
|
||||||
@@ -69,48 +67,9 @@ func logColumnTypes(columnTypes []ColumnType, label string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) {
|
|
||||||
for rows := range in {
|
|
||||||
log.Debugf("Chunk received, transforming...")
|
|
||||||
|
|
||||||
for _, rowValues := range rows {
|
|
||||||
for i, col := range columns {
|
|
||||||
value := rowValues[i]
|
|
||||||
if col.SystemType() == "uniqueidentifier" {
|
|
||||||
if b, ok := value.([]byte); ok {
|
|
||||||
rowValues[i] = mssqlUuidToBigEndian(b)
|
|
||||||
}
|
|
||||||
} else if col.SystemType() == "geometry" || col.SystemType() == "geography" {
|
|
||||||
if b, ok := value.([]byte); ok {
|
|
||||||
rowValues[i] = wkbToEwkbWithSrid(b, 4326)
|
|
||||||
}
|
|
||||||
} else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" {
|
|
||||||
if t, ok := value.(time.Time); ok {
|
|
||||||
rowValues[i] = ensureUTC(t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
out <- rows
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) {
|
func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) {
|
||||||
log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag)
|
log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag)
|
||||||
for i, col := range columns {
|
for i, col := range columns {
|
||||||
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
|
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) {
|
|
||||||
for rows := range in {
|
|
||||||
log.Debugf("Chunk received, loading data into...")
|
|
||||||
|
|
||||||
for i, rowValues := range rows {
|
|
||||||
if i%100 == 0 {
|
|
||||||
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
34
cmd/go_migrate/transformer.go
Normal file
34
cmd/go_migrate/transformer.go
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) {
|
||||||
|
for rows := range in {
|
||||||
|
log.Debugf("Chunk received, transforming...")
|
||||||
|
|
||||||
|
for _, rowValues := range rows {
|
||||||
|
for i, col := range columns {
|
||||||
|
value := rowValues[i]
|
||||||
|
if col.SystemType() == "uniqueidentifier" {
|
||||||
|
if b, ok := value.([]byte); ok {
|
||||||
|
rowValues[i] = mssqlUuidToBigEndian(b)
|
||||||
|
}
|
||||||
|
} else if col.SystemType() == "geometry" || col.SystemType() == "geography" {
|
||||||
|
if b, ok := value.([]byte); ok {
|
||||||
|
rowValues[i] = wkbToEwkbWithSrid(b, 4326)
|
||||||
|
}
|
||||||
|
} else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" {
|
||||||
|
if t, ok := value.(time.Time); ok {
|
||||||
|
rowValues[i] = ensureUTC(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out <- rows
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user