diff --git a/cmd/go_migrate/build-extract-query.go b/cmd/go_migrate/build-extract-query.go index df29617..b2a8c4c 100644 --- a/cmd/go_migrate/build-extract-query.go +++ b/cmd/go_migrate/build-extract-query.go @@ -17,7 +17,7 @@ func buildExtractQueryMssql(job MigrationJob, columns []ColumnType) string { sbColumns.WriteString("]") if col.unifiedType == "GEOMETRY" { - sbColumns.WriteString(".STAsWKB() AS [") + sbColumns.WriteString(".STAsBinary() AS [") sbColumns.WriteString(col.name) sbColumns.WriteString("]") } diff --git a/cmd/go_migrate/extractor.go b/cmd/go_migrate/extractor.go new file mode 100644 index 0000000..fb242e9 --- /dev/null +++ b/cmd/go_migrate/extractor.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "database/sql" + + _ "github.com/microsoft/go-mssqldb" + log "github.com/sirupsen/logrus" +) + +type UnknownRowValues []any + +func extractFromMssql(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *sql.DB, out chan<- []UnknownRowValues) error { + query := buildExtractQueryMssql(job, columns) + log.Debug("Query used to extract data from mssql: ", query) + + rows, err := db.QueryContext(ctx, query) + if err != nil { + return err + } + defer rows.Close() + + rowsChunk := make([]UnknownRowValues, 0, chunkSize) + + for rows.Next() { + values := make([]any, len(columns)) + scanArgs := make([]any, len(columns)) + + for i := range values { + scanArgs[i] = &values[i] + } + + if err := rows.Scan(scanArgs...); err != nil { + return err + } + + rowsChunk = append(rowsChunk, values) + + if len(rowsChunk) >= chunkSize { + out <- rowsChunk + rowsChunk = make([]UnknownRowValues, 0, chunkSize) + log.Infof("Chunk send... %+v", job) + } + } + + if len(rowsChunk) > 0 { + out <- rowsChunk + log.Infof("Chunk send... %+v", job) + } + + return nil +} diff --git a/cmd/go_migrate/main.go b/cmd/go_migrate/main.go index a655b8a..93546e7 100644 --- a/cmd/go_migrate/main.go +++ b/cmd/go_migrate/main.go @@ -18,6 +18,12 @@ var migrationJobs []MigrationJob = []MigrationJob{ }, } +const ( + NumExtractors int = 2 + ChunkSize int = 20 + QueueSize int = 10 +) + func main() { configureLog() log.Info("Starting migration...") diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index 6f1c099..70ce92b 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -1,7 +1,10 @@ package main import ( + "context" "database/sql" + "fmt" + "github.com/jackc/pgx/v5/pgxpool" _ "github.com/microsoft/go-mssqldb" @@ -17,12 +20,16 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration logColumnTypes(sourceColTypes, "Source col types") logColumnTypes(targetColTypes, "Target col types") - sourceQuery := buildExtractQueryMssql(job, sourceColTypes) + chRowsExtract := make(chan []UnknownRowValues, QueueSize) - log.Debug(sourceQuery) + mssqlContext := context.Background() - targetQuery := buildExtractQueryPostgres(job, targetColTypes) - log.Debug(targetQuery) + if err := extractFromMssql(mssqlContext, job, sourceColTypes, ChunkSize, sourceDb, chRowsExtract); err != nil { + log.Fatal("Unexpected error extrating data from mssql: ", err) + } + close(chRowsExtract) + + transformRowsMssql(job, sourceColTypes, chRowsExtract) } func logColumnTypes(columnTypes []ColumnType, label string) { @@ -32,3 +39,22 @@ func logColumnTypes(columnTypes []ColumnType, label string) { log.Infof("%+v", col) } } + +func transformRowsMssql(job MigrationJob, columns []ColumnType, in <-chan []UnknownRowValues) { + for rows := range in { + log.Debug("Chunk received, transforming...") + + for i, rowValues := range rows { + if i%100 == 0 { + logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i)) + } + } + } +} + +func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) { + log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag) + for i, col := range columns { + log.Infof("%s: %v", col.Name(), rowValues[i]) + } +}