diff --git a/cmd/go_migrate/mssql-transform.go b/cmd/go_migrate/mssql-transform.go new file mode 100644 index 0000000..930ee2d --- /dev/null +++ b/cmd/go_migrate/mssql-transform.go @@ -0,0 +1,14 @@ +package main + +func mssqlUuidToBigEndian(mssqlUuid []byte) []byte { + if len(mssqlUuid) != 16 { + return mssqlUuid + } + pgUuid := make([]byte, 16) + pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0] + pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4] + pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6] + copy(pgUuid[8:], mssqlUuid[8:]) + + return pgUuid +} diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index b0daaa4..01cc1e4 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -22,6 +22,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration logColumnTypes(targetColTypes, "Target col types") chRowsExtract := make(chan []UnknownRowValues, QueueSize) + chRowsTransform := make(chan []UnknownRowValues) mssqlContext := context.Background() go func() { @@ -31,11 +32,16 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration close(chRowsExtract) }() - var wgMssqlTransformers sync.WaitGroup - wgMssqlTransformers.Go(func() { - transformRows(job, sourceColTypes, "sqlserver", chRowsExtract) + go func() { + transformRowsMssql(sourceColTypes, chRowsExtract, chRowsTransform) + close(chRowsTransform) + }() + + var wgFakeLoaders sync.WaitGroup + + wgFakeLoaders.Go(func() { + fakeLoader(job, sourceColTypes, chRowsTransform) }) - wgMssqlTransformers.Wait() chRowsExtractPostgres := make(chan []UnknownRowValues, QueueSize) postgresContext := context.Background() @@ -47,12 +53,11 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration close(chRowsExtractPostgres) }() - var wgPostgresTransformers sync.WaitGroup - wgPostgresTransformers.Go(func() { - transformRows(job, sourceColTypes, "postgres", chRowsExtractPostgres) + wgFakeLoaders.Go(func() { + fakeLoader(job, targetColTypes, chRowsExtractPostgres) }) - wgPostgresTransformers.Wait() + wgFakeLoaders.Wait() } func logColumnTypes(columnTypes []ColumnType, label string) { @@ -63,24 +68,22 @@ func logColumnTypes(columnTypes []ColumnType, label string) { } } -func transformRows(job MigrationJob, columns []ColumnType, driver string, in <-chan []UnknownRowValues) { +func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) { for rows := range in { - log.Debugf("Chunk received (%s), transforming...", driver) + log.Debugf("Chunk received, transforming...") - for i, rowValues := range rows { + for _, rowValues := range rows { for i, col := range columns { value := rowValues[i] - if col.SystemType() == "uniqueidentifier" && driver == "sqlserver" { + if col.SystemType() == "uniqueidentifier" { if b, ok := value.([]byte); ok { rowValues[i] = mssqlUuidToBigEndian(b) } } } - - if i%100 == 0 { - logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) - } } + + out <- rows } } @@ -91,15 +94,14 @@ func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowVa } } -func mssqlUuidToBigEndian(mssqlUuid []byte) []byte { - if len(mssqlUuid) != 16 { - return mssqlUuid - } - pgUuid := make([]byte, 16) - pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0] - pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4] - pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6] - copy(pgUuid[8:], mssqlUuid[8:]) +func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) { + for rows := range in { + log.Debugf("Chunk received, loading data into...") - return pgUuid + for i, rowValues := range rows { + if i%100 == 0 { + logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) + } + } + } }