diff --git a/cmd/go_migrate/loader.go b/cmd/go_migrate/loader.go new file mode 100644 index 0000000..878da9e --- /dev/null +++ b/cmd/go_migrate/loader.go @@ -0,0 +1,19 @@ +package main + +import ( + "fmt" + + log "github.com/sirupsen/logrus" +) + +func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) { + for rows := range in { + log.Debugf("Chunk received, loading data into...") + + for i, rowValues := range rows { + if i%100 == 0 { + logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) + } + } + } +} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 8cdf80c..afa38dc 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -3,9 +3,7 @@ package main import ( "context" "database/sql" - "fmt" "sync" - "time" "github.com/jackc/pgx/v5/pgxpool" @@ -69,48 +67,9 @@ func logColumnTypes(columnTypes []ColumnType, label string) { } } -func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) { - for rows := range in { - log.Debugf("Chunk received, transforming...") - - for _, rowValues := range rows { - for i, col := range columns { - value := rowValues[i] - if col.SystemType() == "uniqueidentifier" { - if b, ok := value.([]byte); ok { - rowValues[i] = mssqlUuidToBigEndian(b) - } - } else if col.SystemType() == "geometry" || col.SystemType() == "geography" { - if b, ok := value.([]byte); ok { - rowValues[i] = wkbToEwkbWithSrid(b, 4326) - } - } else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" { - if t, ok := value.(time.Time); ok { - rowValues[i] = ensureUTC(t) - } - } - } - } - - out <- rows - } -} - func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) { log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag) for i, col := range columns { log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i]) } } - -func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) { - for rows := range in { - log.Debugf("Chunk received, loading data into...") - - for i, rowValues := range rows { - if i%100 == 0 { - logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) - } - } - } -} diff --git a/cmd/go_migrate/transformer.go b/cmd/go_migrate/transformer.go new file mode 100644 index 0000000..330f16b --- /dev/null +++ b/cmd/go_migrate/transformer.go @@ -0,0 +1,34 @@ +package main + +import ( + "time" + + log "github.com/sirupsen/logrus" +) + +func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) { + for rows := range in { + log.Debugf("Chunk received, transforming...") + + for _, rowValues := range rows { + for i, col := range columns { + value := rowValues[i] + if col.SystemType() == "uniqueidentifier" { + if b, ok := value.([]byte); ok { + rowValues[i] = mssqlUuidToBigEndian(b) + } + } else if col.SystemType() == "geometry" || col.SystemType() == "geography" { + if b, ok := value.([]byte); ok { + rowValues[i] = wkbToEwkbWithSrid(b, 4326) + } + } else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" { + if t, ok := value.(time.Time); ok { + rowValues[i] = ensureUTC(t) + } + } + } + } + + out <- rows + } +}