57 Commits

Author SHA1 Message Date
ec96532d04 refactor: optimize row handling in mssql extractor and transformer 2026-04-17 01:10:45 -05:00
46597c4ffd refactor: implement extractor retry logic and streamline extractor interface 2026-04-17 00:33:49 -05:00
15d1b96849 refactor: streamline error handling and remove redundant code in mssql extractor 2026-04-17 00:23:01 -05:00
73b65e2a3f refactor: remove extractor error channel and simplify retry logic in mssql and postgres extractors 2026-04-17 00:07:51 -05:00
DiegoAlessandroMotta
1c3db39b21 update extractor interface 2026-04-16 23:49:23 -05:00
39c0d99502 feat: extend context timeout to 1 minute for database queries in mssql.go and postgres.go 2026-04-16 13:19:46 -05:00
b418ded78b feat: update SQL query to filter out specific column names in mssql.go 2026-04-16 13:14:39 -05:00
0d0511716f feat: add row count mismatch error handling in processMigrationJob and update SQL query to exclude additional graph-related columns 2026-04-16 12:46:55 -05:00
67fb0148ae feat: add Makefile for building binaries across platforms 2026-04-16 11:51:05 -05:00
098cf36e3c feat: comment out TRUNCATE statements in pre_sql for MANZANA and PUERTO jobs 2026-04-16 11:46:24 -05:00
5484716b81 feat: add source and target database type fields to MigrationConfig 2026-04-16 09:08:21 -05:00
df4c3bc390 feat: refactor db handling to use db-wrapper package; enhance connection management and result handling for MSSQL and Postgres 2026-04-16 08:48:29 -05:00
ea41a7c218 feat: register MSSQL and Postgres drivers in db-wrapper for improved factory pattern support 2026-04-15 23:09:56 -05:00
f09284ecdc feat: enhance db-wrapper with improved MSSQL and Postgres implementations; add row result handling and dialect support 2026-04-15 22:55:14 -05:00
0384d5423f feat: add range configuration to job settings for enhanced data processing control 2026-04-15 20:23:45 -05:00
1ce3d9e153 refactor: update partition handling to use Range struct for better clarity and consistency 2026-04-15 20:23:45 -05:00
DiegoAlessandroMotta
ed889b740a add db-wrapper package types 2026-04-15 20:22:23 -05:00
803f8988b8 refactor: update extractor interfaces to return row counts instead of using pointers for rows read 2026-04-13 19:25:18 -05:00
33c9cd9c3e feat: implement database wrapper interfaces for MSSQL and Postgres; enhance migration job processing with pre and post SQL execution 2026-04-13 07:57:18 -05:00
85074da2ec feat: add max partition and chunk error limits to extractor and loader error handlers 2026-04-12 20:57:31 -05:00
f126d5bbd0 feat: implement exponential backoff strategy for error handling in extractor and loader processes; enhance retry configuration options 2026-04-12 20:35:29 -05:00
5633dc98d0 fix: enhance error handling in extractor and loader processes; ensure proper job error propagation and logging 2026-04-12 20:11:19 -05:00
01780b4b02 refactor: remove unused ColumnType and inspect-columns files; update migration job to use separate table analyzers for source and target databases 2026-04-12 19:16:14 -05:00
aded502ee4 feat: implement Postgres table analyzer with column type querying and metadata retrieval 2026-04-12 13:38:36 -05:00
4d3cd6e4cf feat: add MSSQL table analyzer and integrate partition range generation for improved data migration 2026-04-11 01:23:13 -05:00
7830ae862d refactor: rename batch-related variables and functions for consistency and clarity 2026-04-11 00:44:12 -05:00
955bc65ce9 refactor: rename Batch to Partition in error handling and processing functions for consistency 2026-04-11 00:32:50 -05:00
9eb9821daf refactor: rename Batch to Partition and update related types and channels for consistency 2026-04-11 00:09:28 -05:00
cd0e53b1d2 feat: implement MSSQL extractor, transformer, and Postgres loader for enhanced data migration 2026-04-10 23:39:37 -05:00
1be7018ba3 feat: refactor configuration to include source and target database types 2026-04-10 22:58:57 -05:00
a5b5a04feb feat: update extractor and transformer constructors to return Extractor interface 2026-04-10 20:42:16 -05:00
c1bae79f98 feat: implement Postgres loader and refactor migration job processing 2026-04-10 20:40:01 -05:00
053e6bd673 feat: add MSSQL extractor and transformer implementations for improved data migration 2026-04-10 19:59:44 -05:00
eb3c3bbfce feat: refactor error handling to use custom_errors.LoaderError for improved error management 2026-04-10 19:35:38 -05:00
9493a2d32f feat: refactor error handling to accept max retry attempts as a parameter for improved flexibility 2026-04-10 19:32:12 -05:00
d228a048b8 feat: update extractor error handling to use models.UnknownRowValues for improved type consistency 2026-04-10 19:29:07 -05:00
ca621352c9 feat: refactor models to improve type handling and enhance error management across migration processes 2026-04-10 19:27:27 -05:00
c2ea84bfcf feat: implement extractor error handling and batch processing for MSSQL and Postgres 2026-04-10 19:06:41 -05:00
6345a0d694 feat: enhance migration job processing with detailed metrics and error handling 2026-04-09 21:55:19 -05:00
1db35c796c feat: enhance migration job processing with parallel execution and improved logging 2026-04-09 20:02:04 -05:00
0d9f955b2f feat: enhance batch processing by adding rowsPerBatch parameter and improving logging messages 2026-04-09 19:46:45 -05:00
524d892a60 feat: refactor migration job structure to use SourceTableInfo and TargetTableInfo for improved configuration handling 2026-04-09 19:20:50 -05:00
e8ace6ecf9 feat: implement job configuration structure and YAML parsing for migration jobs 2026-04-09 18:12:11 -05:00
adbc962464 feat: add configuration parsing and job management in YAML format 2026-04-09 17:47:58 -05:00
a0b51f40c1 feat: add context support to migration job processing for improved cancellation and error handling 2026-04-09 00:43:11 -05:00
b64a76ca45 feat: improve error handling and job cancellation in migration process 2026-04-09 00:38:16 -05:00
51480015ba feat: enhance concurrency management by adding WaitGroup support in extractors and loaders 2026-04-09 00:22:30 -05:00
dc632361e5 feat: implement loader error handling and refactor chunk processing in migration job 2026-04-08 23:42:31 -05:00
0ee5d9032c feat: add context support to error handlers for improved cancellation and error management 2026-04-08 23:07:41 -05:00
d3a3b26bb3 feat: enhance error handling and context management in MSSQL extraction process 2026-04-08 22:39:07 -05:00
554618daad feat: refactor transformation logic in MSSQL processing to use context and improve error handling 2026-04-08 21:47:24 -05:00
7924dd3af7 feat: refactor chunk handling in loader and transformer for improved data processing 2026-04-08 21:11:26 -05:00
f6dfcd390f feat: refactor chunk handling in extractor and transformer for improved data processing 2026-04-08 21:09:26 -05:00
853be4a5a6 feat: update chunk size for MSSQL processing and enhance error handling in transformation functions 2026-04-08 20:48:36 -05:00
eeef3bc813 fix: correct variable name for job error in MSSQL extraction 2026-04-08 20:39:16 -05:00
e158986947 feat: enhance error handling with JobError struct and update extractor logic 2026-04-08 20:21:58 -05:00
bc6f9a6a70 feat: implement batch processing for MSSQL with improved structure and logging 2026-04-08 19:20:09 -05:00
44 changed files with 2667 additions and 904 deletions

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@
*.dll
*.so
*.dylib
bin/
# Test binary, built with `go test -c`
*.test

80
Makefile Normal file
View File

@@ -0,0 +1,80 @@
.PHONY: build build-linux build-windows build-all clean help
# Variables
BINARY_NAME=go-migrate
CMD_PATH=./cmd/go_migrate
OUTPUT_DIR=bin
VERSION?=$(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
GIT_COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Flags de compilación
LD_FLAGS=-ldflags="-s -w -X main.Version=$(VERSION) -X main.BuildTime=$(BUILD_TIME) -X main.GitCommit=$(GIT_COMMIT)"
# Default: compilar para el SO actual
build: build-$(OS)
ifeq ($(OS),Windows_NT)
build-native: build-windows
else
build-native: build-linux
endif
# Compilar para Linux (sin CGO para máxima compatibilidad)
build-linux:
@echo "Compilando para Linux..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64"
# Compilar para Windows
build-windows:
@echo "Compilando para Windows..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe"
# Compilar para ambas plataformas
build-all: build-linux build-windows
@echo ""
@echo "Binarios compilados:"
@ls -lh $(OUTPUT_DIR)/$(BINARY_NAME)*
# Compilar para Linux arm64 (opcional, para Raspberry Pi, etc.)
build-linux-arm64:
@echo "Compilando para Linux ARM64..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64"
# Limpiar binarios
clean:
@echo "Limpiando binarios..."
@rm -rf $(OUTPUT_DIR)
@echo "Limpieza completada"
# Ayuda
help:
@echo "Comandos disponibles:"
@echo ""
@echo " make build - Compilar para el SO actual (Linux/Windows)"
@echo " make build-linux - Compilar para Linux x86_64"
@echo " make build-windows - Compilar para Windows x86_64"
@echo " make build-linux-arm64 - Compilar para Linux ARM64 (opcional)"
@echo " make build-all - Compilar para Linux y Windows"
@echo " make clean - Eliminar binarios compilados"
@echo " make help - Mostrar esta ayuda"
@echo ""
@echo "Ejemplos de uso:"
@echo " make build-all # Crear binarios para ambas plataformas"
@echo " make build-linux OS= # Crear solo para Linux"
@echo ""

View File

@@ -1,66 +0,0 @@
package main
import (
"fmt"
"strings"
)
func buildExtractQueryMssql(job MigrationJob, columns []ColumnType, includeRange bool) string {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.name)
if col.unifiedType == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.name)
}
if i < len(columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", job.Schema, job.Table)
if includeRange {
fmt.Fprintf(&sbQuery, " WHERE [%s] BETWEEN @minRange AND @maxRange", job.PrimaryKey)
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", job.PrimaryKey)
return sbQuery.String()
}
func buildExtractQueryPostgres(job MigrationJob, columns []ColumnType) string {
var sbColumns strings.Builder
if len(columns) == 0 {
sbColumns.WriteString("*")
} else {
for i, col := range columns {
if col.unifiedType == "GEOMETRY" {
sbColumns.WriteString(`ST_AsEWKB("`)
sbColumns.WriteString(col.name)
sbColumns.WriteString(`") AS "`)
sbColumns.WriteString(col.name)
sbColumns.WriteString(`"`)
} else {
sbColumns.WriteString(`"`)
sbColumns.WriteString(col.name)
sbColumns.WriteString(`"`)
}
if i < len(columns)-1 {
sbColumns.WriteString(", ")
}
}
}
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), job.Schema, job.Table, job.PrimaryKey)
}

View File

@@ -1,91 +0,0 @@
package main
import (
"context"
"database/sql"
"fmt"
)
type BatchRange struct {
LowerLimit int
UpperLimit int
validRange bool
}
func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, job MigrationJob) (int, error) {
query := `
SELECT
SUM(p.rows) AS count
FROM sys.tables t
JOIN sys.schemas s ON t.schema_id = s.schema_id
JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name`
var rowsCount int
err := db.QueryRowContext(ctx, query, sql.Named("schema", job.Schema), sql.Named("table", job.Table)).Scan(&rowsCount)
if err != nil {
return 0, err
}
return rowsCount, nil
}
func calculateChunkRangesMssql(ctx context.Context, db *sql.DB, job MigrationJob, batchCount int) ([]BatchRange, error) {
query := fmt.Sprintf(`
SELECT
MIN([%s]) AS lower_limit,
MAX([%s]) AS upper_limit
FROM
(SELECT [%s], NTILE(@batchCount) OVER (ORDER BY [%s]) AS chunk_id FROM [%s].[%s]) AS T
GROUP BY chunk_id
ORDER BY chunk_id`, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.Schema, job.Table)
rows, err := db.QueryContext(ctx, query, sql.Named("batchCount", batchCount))
if err != nil {
return nil, err
}
defer rows.Close()
batchRanges := make([]BatchRange, 0, batchCount)
for rows.Next() {
var br BatchRange
br.validRange = true
if err := rows.Scan(&br.LowerLimit, &br.UpperLimit); err != nil {
return nil, err
}
batchRanges = append(batchRanges, br)
}
if err := rows.Err(); err != nil {
return nil, err
}
return batchRanges, nil
}
const estimatedRowsPerBatch = 100_000
func calculateBatchMetrics(ctx context.Context, db *sql.DB, job MigrationJob) ([]BatchRange, error) {
rowsCount, err := estimateTotalRowsMssql(ctx, db, job)
if err != nil {
return nil, err
}
batchCount := 1
if rowsCount > estimatedRowsPerBatch {
batchCount = rowsCount / estimatedRowsPerBatch
} else {
return []BatchRange{{validRange: false}}, nil
}
chunksRange, err := calculateChunkRangesMssql(ctx, db, job, batchCount)
if err != nil {
return nil, err
}
return chunksRange, nil
}

View File

@@ -1,111 +0,0 @@
package main
import (
"context"
"database/sql"
"time"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
type UnknownRowValues = []any
func extractFromMssql(ctx context.Context, db *sql.DB, job MigrationJob, columns []ColumnType, chunkSize int, batchRange BatchRange, out chan<- []UnknownRowValues) error {
query := buildExtractQueryMssql(job, columns, batchRange.validRange)
log.Debug("Query used to extract data from mssql: ", query)
var queryArgs []any
if batchRange.validRange {
queryArgs = append(queryArgs,
sql.Named("minRange", batchRange.LowerLimit),
sql.Named("maxRange", batchRange.UpperLimit),
)
}
queryStartTime := time.Now()
rows, err := db.QueryContext(ctx, query, queryArgs...)
if err != nil {
return err
}
defer rows.Close()
log.Debugf("Query executed in %v", time.Since(queryStartTime))
rowsChunk := make([]UnknownRowValues, 0, chunkSize)
totalRowsExtracted := 0
chunkCount := 0
chunkStartTime := time.Now()
for rows.Next() {
values := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
if err := rows.Scan(scanArgs...); err != nil {
return err
}
rowsChunk = append(rowsChunk, values)
totalRowsExtracted++
if len(rowsChunk) >= chunkSize {
chunkCount++
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(chunkSize) / chunkDuration.Seconds()
log.Infof("Extracted chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows", chunkCount, len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
out <- rowsChunk
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
chunkStartTime = time.Now()
}
}
if len(rowsChunk) > 0 {
chunkCount++
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
log.Infof("Extracted final chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows",
chunkCount, len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
out <- rowsChunk
}
return rows.Err()
}
func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error {
query := buildExtractQueryPostgres(job, columns)
log.Debug("Query used to extract data from postgres: ", query)
rows, err := db.Query(ctx, query)
if err != nil {
return err
}
defer rows.Close()
rowsChunk := make([]UnknownRowValues, 0, chunkSize)
for rows.Next() {
values, err := rows.Values()
if err != nil {
return err
}
rowsChunk = append(rowsChunk, values)
if len(rowsChunk) >= chunkSize {
out <- rowsChunk
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
log.Infof("Chunk send... %+v", job)
}
}
if len(rowsChunk) > 0 {
out <- rowsChunk
log.Infof("Chunk send... %+v", job)
}
return nil
}

View File

@@ -1,283 +0,0 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func GetUnifiedType(systemType string) string {
systemType = strings.ToLower(systemType)
if systemType == "varchar" || systemType == "char" || systemType == "nvarchar" || systemType == "nchar" || systemType == "text" || systemType == "ntext" {
return "STRING"
}
if systemType == "int" || systemType == "int4" || systemType == "integer" || systemType == "smallint" || systemType == "int2" || systemType == "bigint" || systemType == "int8" || systemType == "tinyint" {
return "INTEGER"
}
if systemType == "decimal" || systemType == "numeric" {
return "DECIMAL"
}
if systemType == "float" || systemType == "real" || systemType == "double precision" {
return "FLOAT"
}
if systemType == "bit" || systemType == "boolean" {
return "BOOLEAN"
}
if systemType == "date" {
return "DATE"
}
if systemType == "time" || systemType == "time without time zone" {
return "TIME"
}
if systemType == "datetime" || systemType == "datetime2" || systemType == "timestamp" || systemType == "timestamptz" || systemType == "timestamp with time zone" {
return "TIMESTAMP"
}
if systemType == "binary" || systemType == "varbinary" || systemType == "image" || systemType == "bytea" {
return "BINARY"
}
if systemType == "uniqueidentifier" || systemType == "uuid" {
return "UUID"
}
if systemType == "json" {
return "JSON"
}
if systemType == "geometry" || systemType == "geography" {
return "GEOMETRY"
}
return strings.ToUpper(systemType)
}
func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, scale *int64) ColumnType {
stringTypes := map[string]bool{
"varchar": true, "char": true, "character": true, "text": true, "character varying": true,
}
decimalTypes := map[string]bool{
"decimal": true, "numeric": true,
}
if stringTypes[column.systemType] {
if maxLength != nil {
column.maxLength = *maxLength
column.hasMaxLength = true
} else {
column.maxLength = -1
column.hasMaxLength = false
}
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
} else if decimalTypes[column.systemType] {
column.hasMaxLength = false
column.maxLength = -1
if precision != nil && scale != nil {
column.precision = *precision
column.scale = *scale
column.hasPrecisionScale = true
} else {
column.precision = -1
column.scale = -1
column.hasPrecisionScale = false
}
} else {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
}
column.unifiedType = GetUnifiedType(column.systemType)
return column
}
func GetColumnTypesPostgres(db *pgxpool.Pool, migrationJob MigrationJob) ([]ColumnType, error) {
query := `
SELECT
c.column_name AS name,
c.data_type AS user_type,
c.udt_name AS system_type,
(CASE WHEN c.is_nullable = 'YES' THEN TRUE ELSE FALSE END) AS nullable,
c.character_maximum_length AS max_length,
c.numeric_precision AS precision,
c.numeric_scale AS scale
FROM information_schema.columns c
WHERE c.table_schema = $1 AND c.table_name = $2
ORDER BY c.ordinal_position;
`
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
rows, err := db.Query(ctx, query, migrationJob.Schema, migrationJob.Table)
if err != nil {
return nil, fmt.Errorf("Error querying column types: %w", err)
}
defer rows.Close()
var colTypes []ColumnType
for rows.Next() {
var column ColumnType
var scanMaxLength *int64
var scanPrecision *int64
var scanScale *int64
if err := rows.Scan(
&column.name,
&column.userType,
&column.systemType,
&column.nullable,
&scanMaxLength,
&scanPrecision,
&scanScale,
); err != nil {
return nil, fmt.Errorf("Error scanning column type results: %w", err)
}
colTypes = append(colTypes, MapPostgresColumn(column, scanMaxLength, scanPrecision, scanScale))
}
return colTypes, nil
}
func MapMssqlColumn(column ColumnType) ColumnType {
stringTypes := map[string]bool{
"varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true,
}
decimalTypes := map[string]bool{
"decimal": true, "numeric": true,
}
if stringTypes[column.systemType] {
column.hasMaxLength = true
if column.systemType == "nvarchar" || column.systemType == "nchar" {
if column.maxLength > 0 {
column.maxLength = column.maxLength / 2
}
}
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
} else if decimalTypes[column.systemType] {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = true
} else {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
}
column.unifiedType = GetUnifiedType(column.systemType)
return column
}
func GetColumnTypesMssql(db *sql.DB, migrationJob MigrationJob) ([]ColumnType, error) {
query := `
SELECT
c.name AS name,
t.name AS user_type,
CASE WHEN t.is_user_defined = 0 THEN t.name ELSE bt.name END AS system_type,
c.is_nullable AS nullable,
c.max_length AS max_length,
c.precision AS precision,
c.scale AS scale
FROM sys.columns c
JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table
ORDER BY c.column_id;
`
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
rows, err := db.QueryContext(ctx, query, sql.Named("schema", migrationJob.Schema), sql.Named("table", migrationJob.Table))
if err != nil {
return nil, fmt.Errorf("Error querying column types: %w", err)
}
defer rows.Close()
var colTypes []ColumnType
for rows.Next() {
var column ColumnType
if err := rows.Scan(
&column.name,
&column.userType,
&column.systemType,
&column.nullable,
&column.maxLength,
&column.precision,
&column.scale,
); err != nil {
return nil, fmt.Errorf("Error scanning column type results: %W", err)
}
if strings.HasPrefix(column.name, "graph_id") && column.systemType == "bigint" {
continue
}
colTypes = append(colTypes, MapMssqlColumn(column))
}
return colTypes, nil
}
func GetColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob MigrationJob) ([]ColumnType, []ColumnType, error) {
var sourceDbErr error
var targetDbErr error
var sourceColTypes []ColumnType
var targetColTypes []ColumnType
var wg sync.WaitGroup
wg.Go(func() {
sourceColTypes, sourceDbErr = GetColumnTypesMssql(sourceDb, migrationJob)
if sourceDbErr != nil {
log.Error("Error (sourceDb): ", sourceDbErr)
}
})
wg.Go(func() {
targetColTypes, targetDbErr = GetColumnTypesPostgres(targetDb, migrationJob)
if targetDbErr != nil {
log.Error("Error (targetDb): ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Error querying column types")
}
return sourceColTypes, targetColTypes, nil
}

View File

@@ -1,136 +0,0 @@
package main
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
mssql "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
for rows := range in {
log.Debugf("Chunk received, loading data into...")
for i, rowValues := range rows {
if i%100 == 0 {
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
}
}
}
}
func loadRowsPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, db *pgxpool.Pool, in <-chan []UnknownRowValues) error {
chunkCount := 0
totalRowsLoaded := 0
for rows := range in {
chunkStartTime := time.Now()
identifier := pgx.Identifier{job.Schema, job.Table}
colNames := Map(columns, func(col ColumnType) string {
return col.name
})
copyStartTime := time.Now()
_, err := db.CopyFrom(
ctx,
identifier,
colNames,
pgx.CopyFromRows(rows),
)
if err != nil {
return err
}
chunkCount++
totalRowsLoaded += len(rows)
copyDuration := time.Since(copyStartTime)
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
log.Infof("Loaded chunk #%d: %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
}
return nil
}
func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error {
chunkCount := 0
totalRowsLoaded := 0
for rows := range in {
chunkStartTime := time.Now()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("error starting transaction: %w", err)
}
fullTableName := fmt.Sprintf("[%s].[%s]", job.Schema, job.Table)
colNames := Map(columns, func(col ColumnType) string {
return col.name
})
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, colNames...))
if err != nil {
tx.Rollback()
return fmt.Errorf("error preparing bulk copy statement: %w", err)
}
copyStartTime := time.Now()
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return fmt.Errorf("error executing row insert: %w", err)
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return fmt.Errorf("error flushing bulk data: %w", err)
}
err = stmt.Close()
if err != nil {
tx.Rollback()
return fmt.Errorf("error closing statement: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("error committing transaction: %w", err)
}
rowsAffected, _ := result.RowsAffected()
chunkCount++
totalRowsLoaded += int(rowsAffected)
copyDuration := time.Since(copyStartTime)
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
log.Infof("Loaded chunk #%d (MSSQL): %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
}
return nil
}
func Map[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -1,57 +1,176 @@
package main
import (
"context"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
log "github.com/sirupsen/logrus"
)
type MigrationJob struct {
Schema string
Table string
PrimaryKey string
}
var migrationJobs []MigrationJob = []MigrationJob{
{
Schema: "Cartografia",
Table: "MANZANA",
PrimaryKey: "GDB_ARCHIVE_OID",
},
{
Schema: "Red",
Table: "PUERTO",
PrimaryKey: "ID_PUERTO",
},
}
const (
NumExtractors int = 4
NumLoaders int = 8
ChunkSize int = 25000
QueueSize int = 8
"golang.org/x/sync/errgroup"
)
func main() {
configureLog()
startTime := time.Now()
log.Info("=== Starting migration ===")
log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize)
sourceDb, targetDb, connError := connectToDatabases()
if connError != nil {
log.Fatal("Connection error: ", connError)
migrationConfig, err := config.ReadMigrationConfig()
if err != nil {
log.Fatalf("error leyendo configuracion: %v", err)
}
log.Debugf("Config: %+v", migrationConfig)
startTime := time.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Info("=== Starting migration ===")
var wgConnect errgroup.Group
var sourceDb, targetDb dbwrapper.DbWrapper
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
if err := wgConnect.Wait(); err != nil {
log.Error("Connection error: ", err)
return
}
defer sourceDb.Close()
defer targetDb.Close()
for _, job := range migrationJobs {
log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table)
processMigrationJob(sourceDb, targetDb, job)
results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
log.Info("=== RESUMEN DE MIGRACIÓN ===")
var totalProcessed, totalErrors int64
for _, res := range results {
status := "OK"
if res.Error != nil {
status = "FAILED"
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v | Error: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration, res.Error)
} else {
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
}
totalProcessed += res.RowsLoaded
if res.Error != nil {
totalErrors++
}
}
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
totalDuration := time.Since(startTime)
log.Infof("=== Migration completed successfully! ===")
log.Infof("Total migration time: %v", totalDuration)
}
func processMigrationJobs(
ctx context.Context,
sourceDb dbwrapper.DbWrapper,
targetDb dbwrapper.DbWrapper,
jobs []config.Job,
maxParallelWorkers int,
) []JobResult {
if len(jobs) == 0 {
log.Info("No migration jobs configured")
return []JobResult{}
}
if maxParallelWorkers <= 0 {
maxParallelWorkers = 1
}
if maxParallelWorkers > len(jobs) {
maxParallelWorkers = len(jobs)
}
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
transformer := transformers.NewMssqlTransformer()
loader := loaders.NewPostgresLoader(targetDb)
for i := range maxParallelWorkers {
wgJobs.Go(func() {
for job := range chJobs {
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob(
ctx,
targetDb,
sourceTableAnalyzer,
targetTableAnalyzer,
extractor,
transformer,
loader,
job,
)
chJobResults <- res
}
})
}
for _, job := range jobs {
chJobs <- job
}
close(chJobs)
go func() {
wgJobs.Wait()
close(chJobResults)
}()
var finalResults []JobResult
for res := range chJobResults {
finalResults = append(finalResults, res)
}
return finalResults
}
func connectWithTimeout(ctx context.Context, dbType string, dbUrl string, timeout time.Duration) (dbwrapper.DbWrapper, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sourceDb, err := dbwrapper.New(dbType)
if err != nil {
return nil, err
}
if err = sourceDb.Connect(localCtx, dbUrl); err != nil {
return nil, err
}
return sourceDb, nil
}

13
cmd/go_migrate/metrics.go Normal file
View File

@@ -0,0 +1,13 @@
package main
import "time"
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

View File

@@ -2,115 +2,230 @@ package main
import (
"context"
"database/sql"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job MigrationJob) {
jobStartTime := time.Now()
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
func processMigrationJob(
ctx context.Context,
targetDbWrapper dbwrapper.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor,
transformer etl.Transformer,
loader etl.Loader,
job config.Job,
) JobResult {
localCtx, cancel := context.WithCancel(ctx)
defer cancel()
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job)
if err != nil {
log.Fatal("Unexpected error: ", err)
result := JobResult{
JobName: job.Name,
StartTime: time.Now(),
}
logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types")
var rowsRead, rowsLoaded, rowsFailed int64
mssqlCtx := context.Background()
batchRanges, err := calculateBatchMetrics(mssqlCtx, sourceDb, job)
var wgQueryColumnTypes errgroup.Group
var sourceColTypes, targetColTypes []models.ColumnType
wgQueryColumnTypes.Go(func() error {
var err error
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo)
if err != nil {
return err
}
return nil
})
wgQueryColumnTypes.Go(func() error {
var err error
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo)
if err != nil {
return err
}
return nil
})
err := wgQueryColumnTypes.Wait()
if err != nil {
result.Error = err
return result
}
for _, query := range job.PreSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
partitions, err := table_analyzers.PartitionRangeGenerator(
localCtx,
sourceTableAnalyzer,
job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey,
job.RowsPerPartition,
)
if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err)
}
chBatchRanges := make(chan BatchRange, len(batchRanges))
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
chPartitions := make(chan models.Partition, job.QueueSize)
chBatchesRaw := make(chan models.Batch, job.QueueSize)
chBatchesTransformed := make(chan models.Batch, job.QueueSize)
maxExtractors := min(NumExtractors, len(batchRanges))
chRowsExtract := make(chan []UnknownRowValues, QueueSize)
var wgMssqlExtractors sync.WaitGroup
var wgActivePartitions sync.WaitGroup
var wgActiveBatches sync.WaitGroup
var wgExtractors sync.WaitGroup
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
go func() {
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
cancel()
result.Error = err
}
}()
go custom_errors.LoaderErrorHandler(
localCtx,
job.Retry,
job.MaxChunkErrors,
chLoadersErrors,
chBatchesTransformed,
chJobErrors,
&wgActiveBatches,
)
maxExtractors := min(job.MaxExtractors, len(partitions))
log.Infof("Starting %d extractor(s)...", maxExtractors)
log.Infof("Starting %d MSSQL extractors...", maxExtractors)
extractStartTime := time.Now()
for range maxExtractors {
wgMssqlExtractors.Go(func() {
for br := range chBatchRanges {
if err := extractFromMssql(mssqlCtx, sourceDb, job, sourceColTypes, ChunkSize, br, chRowsExtract); err != nil {
log.Error("Unexpected error extracting data from mssql: ", err)
}
}
wgExtractors.Go(func() {
extractors.Consume(
localCtx,
extractor,
job.SourceTable,
sourceColTypes,
job.BatchSize,
chPartitions,
chBatchesRaw,
chJobErrors,
&wgActivePartitions,
&rowsRead,
)
})
}
wgActivePartitions.Add(len(partitions))
go func() {
for _, br := range batchRanges {
chBatchRanges <- br
for _, batch := range partitions {
chPartitions <- batch
}
close(chBatchRanges)
}()
go func() {
wgMssqlExtractors.Wait()
close(chRowsExtract)
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
}()
log.Infof("Starting %d transformer(s)...", maxExtractors)
chRowsTransform := make(chan []UnknownRowValues, QueueSize)
var wgMssqlTransformers sync.WaitGroup
log.Infof("Starting %d MSSQL transformers...", maxExtractors)
transformStartTime := time.Now()
for range maxExtractors {
wgMssqlTransformers.Go(func() {
transformRowsMssql(sourceColTypes, chRowsExtract, chRowsTransform)
wgTransformers.Go(func() {
transformer.Exec(
localCtx,
sourceColTypes,
chBatchesRaw,
chBatchesTransformed,
chJobErrors,
&wgActiveBatches,
)
})
}
log.Infof("Starting %d loader(s)...", job.MaxLoaders)
for range job.MaxLoaders {
wgLoaders.Go(func() {
loader.Exec(
localCtx,
job.TargetTable,
targetColTypes,
chBatchesTransformed,
chLoadersErrors,
chJobErrors,
&wgActiveBatches,
&rowsLoaded,
)
})
}
go func() {
wgMssqlTransformers.Wait()
close(chRowsTransform)
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
log.Debugf("Waiting for goroutines (%v)", job.Name)
wgActivePartitions.Wait()
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name)
wgExtractors.Wait()
log.Debugf("wgExtractors is empty (%v)", job.Name)
close(chBatchesRaw)
log.Debugf("chBatchesRaw is closed (%v)", job.Name)
wgTransformers.Wait()
log.Debugf("wgTransformers is empty (%v)", job.Name)
wgActiveBatches.Wait()
log.Debugf("wgActiveBatches is empty (%v)", job.Name)
close(chBatchesTransformed)
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
close(chLoadersErrors)
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
wgLoaders.Wait()
log.Debugf("wgLoaders is empty (%v)", job.Name)
cancel()
}()
var wgPostgresLoaders sync.WaitGroup
postgresLoaderCtx := context.Background()
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
loaderStartTime := time.Now()
for range NumLoaders {
wgPostgresLoaders.Go(func() {
if err := loadRowsPostgres(postgresLoaderCtx, job, targetColTypes, targetDb, chRowsTransform); err != nil {
log.Error("Unexpected error loading data into postgres: ", err)
for _, query := range job.PostSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
// fakeLoader(job, sourceColTypes, chRowsTransform)
})
}
wgPostgresLoaders.Wait()
log.Infof("Loading completed in %v", time.Since(loaderStartTime))
log.Debugf("waiting for local context to be done (%v)", job.Name)
<-localCtx.Done()
log.Debugf("local context done (%v)", job.Name)
totalDuration := time.Since(jobStartTime)
log.Infof("Migration job completed successfully! Total time: %v", totalDuration)
}
func logColumnTypes(columnTypes []ColumnType, label string) {
log.Debug(label)
for _, col := range columnTypes {
log.Debugf("%+v", col)
}
}
func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) {
log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag)
for i, col := range columns {
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
}
if ctx.Err() != nil {
result.Error = ctx.Err()
}
result.Duration = time.Since(result.StartTime)
result.RowsRead = atomic.LoadInt64(&rowsRead)
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed)
if result.RowsRead != result.RowsLoaded {
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
return result
}

View File

@@ -1,45 +0,0 @@
package main
import (
"time"
log "github.com/sirupsen/logrus"
)
func transformRowsMssql(columns []ColumnType, in <-chan []UnknownRowValues, out chan<- []UnknownRowValues) {
chunkCount := 0
totalRowsTransformed := 0
for rows := range in {
chunkStartTime := time.Now()
log.Debugf("Chunk #%d received, transforming %d rows...", chunkCount+1, len(rows))
for _, rowValues := range rows {
for i, col := range columns {
value := rowValues[i]
if col.SystemType() == "uniqueidentifier" {
if b, ok := value.([]byte); ok {
rowValues[i] = mssqlUuidToBigEndian(b)
}
} else if col.SystemType() == "geometry" || col.SystemType() == "geography" {
if b, ok := value.([]byte); ok {
rowValues[i] = wkbToEwkbWithSrid(b, 4326)
}
} else if col.SystemType() == "datetime" || col.SystemType() == "datetime2" {
if t, ok := value.(time.Time); ok {
rowValues[i] = ensureUTC(t)
}
}
}
}
chunkCount++
totalRowsTransformed += len(rows)
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
log.Infof("Transformed chunk #%d: %d rows in %v (%.0f rows/sec) - Total: %d rows",
chunkCount, len(rows), chunkDuration, rowsPerSec, totalRowsTransformed)
out <- rows
}
}

53
config.yaml Normal file
View File

@@ -0,0 +1,53 @@
max_parallel_workers: 4
source_db_type: sqlserver
target_db_type: postgres
defaults:
max_extractors: 2
max_loaders: 4
queue_size: 8
batch_size: 25000
batches_per_partition: 8
truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_chunk_errors: 5
retry:
attempts: 3
base_delay_ms: 500
max_delay_ms: 10000
max_jitter_ms: 500
jobs:
- name: cartografia_manzana
enabled: true
source:
schema: Cartografia
table: MANZANA
primary_key: GDB_ARCHIVE_OID
target:
schema: Cartografia
table: MANZANA
pre_sql:
- 'SELECT 1'
# - 'TRUNCATE TABLE "Cartografia"."MANZANA"'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto
enabled: true
source:
schema: Red
table: PUERTO
primary_key: ID_PUERTO
target:
schema: Red
table: PUERTO
pre_sql:
- 'SELECT 1'
# - 'TRUNCATE TABLE "Red"."PUERTO"'
post_sql:
- "SELECT 1"

5
go.mod
View File

@@ -10,6 +10,8 @@ require (
github.com/microsoft/go-mssqldb v1.9.8
github.com/sirupsen/logrus v1.9.4
github.com/twpayne/go-geom v1.6.1
golang.org/x/sync v0.19.0
gopkg.in/yaml.v3 v3.0.1
)
require (
@@ -18,9 +20,10 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
)

9
go.sum
View File

@@ -16,6 +16,7 @@ github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZ
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -41,6 +42,10 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/microsoft/go-mssqldb v1.9.8 h1:d4IFMvF/o+HdpXUqbBfzHvn/NlFA75YGcfHUUvDFJEM=
@@ -49,6 +54,8 @@ github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmd
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w=
@@ -71,6 +78,8 @@ golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -9,9 +9,7 @@ import (
type appConfig struct {
SourceDbUrl string
SourceDbType string
TargetDbUrl string
TargetDbType string
}
func loadEnv() {
@@ -29,26 +27,14 @@ func getAppConfig() appConfig {
log.Fatal("SOURCE_DB_URL environment variable not set")
}
sourceDbType := os.Getenv("SOURCE_DB_TYPE")
if sourceDbType == "" {
log.Fatal("SOURCE_DB_TYPE environment variable not set")
}
targetDbUrl := os.Getenv("TARGET_DB_URL")
if targetDbUrl == "" {
log.Fatal("TARGET_DB_URL environment variable not set")
}
targetDbType := os.Getenv("TARGET_DB_TYPE")
if targetDbType == "" {
log.Fatal("TARGET_DB_TYPE environment variable not set")
}
return appConfig{
SourceDbUrl: sourceDbUrl,
SourceDbType: sourceDbType,
TargetDbUrl: targetDbUrl,
TargetDbType: targetDbType,
}
}

View File

@@ -0,0 +1,138 @@
package config
import (
"fmt"
"os"
"gopkg.in/yaml.v3"
)
type RetryConfig struct {
Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"`
}
type JobConfig struct {
MaxExtractors int `yaml:"max_extractors"`
MaxLoaders int `yaml:"max_loaders"`
QueueSize int `yaml:"queue_size"`
BatchSize int `yaml:"batch_size"`
BatchesPerPartition int `yaml:"batches_per_partition"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxChunkErrors int `yaml:"max_chunk_errors"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
}
type TableInfo struct {
Schema string `yaml:"schema"`
Table string `yaml:"table"`
}
type TargetTableInfo struct {
TableInfo `yaml:",inline"`
}
type SourceTableInfo struct {
TableInfo `yaml:",inline"`
PrimaryKey string `yaml:"primary_key"`
}
type Job struct {
Name string `yaml:"name"`
Enabled bool `yaml:"enabled"`
SourceTable SourceTableInfo `yaml:"source"`
TargetTable TargetTableInfo `yaml:"target"`
PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"`
Range struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
}
type MigrationConfig struct {
MaxParallelWorkers int `yaml:"max_parallel_workers"`
SourceDbType string `yaml:"source_db_type"`
TargetDbType string `yaml:"target_db_type"`
Defaults JobConfig `yaml:"defaults"`
Jobs []Job `yaml:"jobs"`
}
type rawConfig struct {
MaxParallelWorkers int `yaml:"max_parallel_workers"`
SourceDbType string `yaml:"source_db_type"`
TargetDbType string `yaml:"target_db_type"`
Defaults JobConfig `yaml:"defaults"`
Jobs []yaml.Node `yaml:"jobs"`
}
func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
var raw rawConfig
if err := value.Decode(&raw); err != nil {
return err
}
c.MaxParallelWorkers = raw.MaxParallelWorkers
c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs {
job := Job{
JobConfig: raw.Defaults,
}
if err := node.Decode(&job); err != nil {
return err
}
job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition)
c.Jobs = append(c.Jobs, job)
}
return nil
}
const defaultConfigFileName string = "config.yaml"
func filenamesOrDefault(filenames []string) []string {
if len(filenames) == 0 {
return []string{defaultConfigFileName}
}
return filenames
}
func ReadMigrationConfig(filenames ...string) (MigrationConfig, error) {
filenames = filenamesOrDefault(filenames)
var data []byte
var err error
for _, filename := range filenames {
data, err = os.ReadFile(filename)
if err != nil {
continue
}
break
}
if err != nil {
return MigrationConfig{}, fmt.Errorf("Error reading config file: %v", err)
}
var config MigrationConfig
if err := yaml.Unmarshal(data, &config); err != nil {
return MigrationConfig{}, fmt.Errorf("Error parsing config file: %v", err)
}
return config, nil
}

View File

@@ -0,0 +1,18 @@
package convert
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
}
}

View File

@@ -0,0 +1,61 @@
package custom_errors
import (
"context"
"math/rand"
"time"
)
func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
if retryCounter < 0 {
retryCounter = 0
}
delay := max(time.Duration(baseDelayMs)*time.Millisecond, 0)
maxDelay := time.Duration(maxDelayMs) * time.Millisecond
for i := 0; i < retryCounter; i++ {
if maxDelayMs > 0 && delay >= maxDelay {
delay = maxDelay
break
}
if delay == 0 {
break
}
delay *= 2
}
if maxDelayMs > 0 && delay > maxDelay {
delay = maxDelay
}
if maxJitterMs > 0 {
jitter := time.Duration(rand.Intn(maxJitterMs+1)) * time.Millisecond
delay += jitter
}
if delay < 0 {
delay = 0
}
return delay
}
func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) {
if delay <= 0 {
enqueue()
return
}
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return
case <-timer.C:
enqueue()
}
}()
}

View File

@@ -0,0 +1,16 @@
package custom_errors
import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type ExtractorError struct {
Partition models.Partition
LastId int64
HasLastId bool
Msg string
}
func (e *ExtractorError) Error() string {
return e.Msg
}

View File

@@ -0,0 +1,47 @@
package custom_errors
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
)
type JobError struct {
ShouldCancelJob bool
Msg string
Prev error
}
func (e *JobError) Error() string {
if e.Prev != nil {
return fmt.Sprintf("%s: %v", e.Msg, e.Prev)
}
return e.Msg
}
func JobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
for {
if ctx.Err() != nil {
return nil
}
select {
case <-ctx.Done():
return nil
case err, ok := <-chErrorsIn:
if !ok {
return nil
}
if err.ShouldCancelJob {
log.Errorf("(Fatal job error) - %v - %v", err.Msg, err.Prev)
return &err
}
log.Errorf("%v - %v", err.Msg, err.Prev)
}
}
}

View File

@@ -0,0 +1,107 @@
package custom_errors
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type LoaderError struct {
Batch models.Batch
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}
func LoaderErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxChunkErrors int,
chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
err.Batch.RetryCounter++
delay := computeBackoffDelay(
err.Batch.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
requeueWithBackoff(ctx, delay, func() {
select {
case chBatchesOut <- err.Batch:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -0,0 +1,19 @@
package dbwrapper
import "fmt"
type Factory func() DbWrapper
var drivers = make(map[string]Factory)
func Register(name string, factory Factory) {
drivers[name] = factory
}
func New(driverType string) (DbWrapper, error) {
factory, ok := drivers[driverType]
if !ok {
return nil, fmt.Errorf("driver not yet supported: %s", driverType)
}
return factory(), nil
}

View File

@@ -0,0 +1,176 @@
package dbwrapper
import (
"context"
"database/sql"
"fmt"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
})
}
type mssqlRowResult struct {
row *sql.Row
}
func (mr *mssqlRowResult) Scan(dest ...any) error {
return mr.row.Scan(dest...)
}
type mssqlRowsResult struct {
columns []string
rows *sql.Rows
}
func (mr *mssqlRowsResult) Close() error {
return mr.rows.Close()
}
func (mr *mssqlRowsResult) Columns() ([]string, error) {
if mr.columns != nil {
return mr.columns, nil
}
return mr.rows.Columns()
}
func (mr *mssqlRowsResult) Err() error {
return mr.rows.Err()
}
func (mr *mssqlRowsResult) Next() bool {
return mr.rows.Next()
}
func (mr *mssqlRowsResult) Scan(dest ...any) error {
return mr.rows.Scan(dest...)
}
func (mr *mssqlRowsResult) Values() ([]any, error) {
columns, err := mr.Columns()
if err != nil {
return nil, err
}
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := mr.rows.Scan(scanArgs...); err != nil {
return nil, err
}
return rowValues, nil
}
type mssqlDbWrapper struct {
db *sql.DB
dialect string
}
func (mw *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error {
db, err := sql.Open("sqlserver", dbUrl)
if err != nil {
return err
}
if err := db.PingContext(ctx); err != nil {
if err := db.Close(); err != nil {
return err
}
return err
}
mw.db = db
return nil
}
func (mw *mssqlDbWrapper) Close() error {
return mw.db.Close()
}
func (mw *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, execErr := mw.db.ExecContext(ctx, query, args...)
if execErr != nil {
return ExecResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: affectedRows}, nil
}
func (mw *mssqlDbWrapper) GetDialect() string {
return mw.dialect
}
func (mw *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := mw.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &mssqlRowsResult{columns: nil, rows: rows}, nil
}
func (mw *mssqlDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := mw.db.QueryRowContext(ctx, query, args...)
return &mssqlRowResult{row: row}
}
func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
tx, err := mw.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
fullTableName := fmt.Sprintf("[%s].[%s]", schema, table)
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, columnNames...))
if err != nil {
tx.Rollback()
return 0, err
}
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
if err := stmt.Close(); err != nil {
tx.Rollback()
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
rowsAffected, raErr := result.RowsAffected()
if raErr != nil {
return 0, nil
}
return rowsAffected, nil
}

View File

@@ -0,0 +1,128 @@
package dbwrapper
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
})
}
type postgresRowResult struct {
row pgx.Row
}
func (pr *postgresRowResult) Scan(dest ...any) error {
return pr.row.Scan(dest...)
}
type postgresRowsResult struct {
columns []string
rows pgx.Rows
}
func (pr *postgresRowsResult) Close() error {
pr.rows.Close()
return nil
}
func (pr *postgresRowsResult) Columns() ([]string, error) {
if pr.columns != nil {
return pr.columns, nil
}
rawColumns := pr.rows.FieldDescriptions()
if rawColumns == nil {
return nil, errors.New("error retrieving columns")
}
columns := make([]string, 0, len(rawColumns))
for _, rc := range rawColumns {
columns = append(columns, rc.Name)
}
return columns, nil
}
func (pr *postgresRowsResult) Err() error {
return pr.rows.Err()
}
func (pr *postgresRowsResult) Next() bool {
return pr.rows.Next()
}
func (pr *postgresRowsResult) Scan(dest ...any) error {
return pr.rows.Scan(dest...)
}
func (pr *postgresRowsResult) Values() ([]any, error) {
return pr.rows.Values()
}
type postgresDbWrapper struct {
db *pgxpool.Pool
dialect string
}
func (pw *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error {
pool, err := pgxpool.New(ctx, dbUrl)
if err != nil {
return err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return err
}
pw.db = pool
return nil
}
func (pw *postgresDbWrapper) Close() error {
pw.db.Close()
return nil
}
func (pw *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, err := pw.db.Exec(ctx, query, args...)
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: result.RowsAffected()}, nil
}
func (pw *postgresDbWrapper) GetDialect() string {
return pw.dialect
}
func (pw *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := pw.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRowsResult{columns: nil, rows: rows}, nil
}
func (pw *postgresDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := pw.db.QueryRow(ctx, query, args...)
return &postgresRowResult{row: row}
}
func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
affectedRows, err := pw.db.CopyFrom(ctx, pgx.Identifier{schema, table}, columnNames, pgx.CopyFromRows(rows))
if err != nil {
return 0, err
}
return affectedRows, nil
}

View File

@@ -0,0 +1,35 @@
package dbwrapper
import (
"context"
"errors"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
type ExecResult struct {
AffectedRows int64
}
type RowsResult interface {
Close() error
Columns() ([]string, error)
Err() error
Next() bool
Scan(dest ...any) error
Values() ([]any, error)
}
type RowResult interface {
Scan(dest ...any) error
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
Exec(ctx context.Context, query string, args ...any) (ExecResult, error)
GetDialect() string
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
}

View File

@@ -1,28 +0,0 @@
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}

View File

@@ -0,0 +1,92 @@
package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
func Consume(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := extractWithRetries(
ctx,
extractor,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
wgActivePartitions.Done()
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, rowsReadResult)
}
if err != nil {
var jobError *custom_errors.JobError
if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
}
}
}
}

View File

@@ -0,0 +1,70 @@
package extractors
import (
"context"
"errors"
"fmt"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func extractWithRetries(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
delay := time.Duration(time.Second * 1)
currentParitition := partition
for {
rowsRead, err := extractor.Exec(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
currentParitition.RetryCounter++
if currentParitition.RetryCounter > 3 {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}

View File

@@ -0,0 +1,64 @@
package extractors
import (
"context"
"fmt"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func errorFromLastPartitionRow(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition models.Partition,
previousError error,
) error {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok {
currentPartition := partition
currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{
Partition: currentPartition,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return &custom_errors.ExtractorError{
Partition: partition,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}
func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error {
select {
case chBatchesOut <- batch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func flush(
ctx context.Context,
partition *models.Partition,
batchSize int,
batchRows []models.UnknownRowValues,
chBatchesOut chan<- models.Batch,
) error {
if len(batchRows) == 0 {
return nil
}
batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
return sendBatch(ctx, chBatchesOut, batch)
}

View File

@@ -0,0 +1,121 @@
package extractors
import (
"context"
"database/sql"
"fmt"
"strings"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type MssqlExtractor struct {
db dbwrapper.DbWrapper
}
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &MssqlExtractor{db: db}
}
func buildExtractQueryMssql(
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
includeRange bool,
isMinInclusive bool,
) string {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
if col.Type() == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
}
if i < len(columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table)
if includeRange {
fmt.Fprintf(&sbQuery, " WHERE [%s]", tableInfo.PrimaryKey)
if isMinInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", tableInfo.PrimaryKey)
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey)
return sbQuery.String()
}
func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
var queryArgs []any
if partition.HasRange {
queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min), sql.Named("max", partition.Range.Max))
}
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil {
return 0, err
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
var rowsRead int64 = 0
for rows.Next() {
values, err := rows.Values()
if err != nil {
if len(batchRows) == 0 {
return rowsRead, err
}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
}
rowsRead++
batchRows = append(batchRows, values)
if len(batchRows) >= batchSize {
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
}
}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
return rowsRead, rows.Err()
}

View File

@@ -0,0 +1,110 @@
package extractors
import (
"context"
"errors"
"fmt"
"strings"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type PostgresExtractor struct {
db dbwrapper.DbWrapper
}
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &PostgresExtractor{db: db}
}
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
var sbColumns strings.Builder
if len(columns) == 0 {
sbColumns.WriteString("*")
} else {
for i, col := range columns {
if col.Type() == "GEOMETRY" {
sbColumns.WriteString(`ST_AsEWKB("`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`") AS "`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`"`)
} else {
sbColumns.WriteString(`"`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`"`)
}
if i < len(columns)-1 {
sbColumns.WriteString(", ")
}
}
}
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
}
func (postgresEx *PostgresExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
query := buildExtractQueryPostgres(tableInfo, columns)
if partition.HasRange {
return 0, errors.New("Batch config not yet supported")
}
var rowsRead int64 = 0
rows, err := postgresEx.db.Query(ctx, query)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() {
values, err := rows.Values()
if err != nil {
return rowsRead, errors.New("Unexpected error reading rows from source")
}
rowsRead++
batchRows = append(batchRows, values)
if len(batchRows) >= batchSize {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := rows.Err(); err != nil {
return rowsRead, errors.New("Unexpected error reading rows from source")
}
if len(batchRows) > 0 {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, nil
}
}
return rowsRead, nil
}

View File

@@ -0,0 +1,127 @@
package loaders
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgconn"
)
type PostgresLoader struct {
db dbwrapper.DbWrapper
}
func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: db}
}
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func (postgresLd *PostgresLoader) ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error) {
_, err := postgresLd.db.SaveMassive(
ctx,
tableInfo.Schema,
tableInfo.Table,
colNames,
batch.Rows,
)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Prev: err,
}
}
}
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
}
return len(batch.Rows), nil
}
func (postgresLd *PostgresLoader) Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64,
) {
colNames := mapSlice(columns, func(col models.ColumnType) string {
return col.Name()
})
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
processedRows, err := postgresLd.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil {
var ldError *custom_errors.LoaderError
var jobError *custom_errors.JobError
if errors.As(err, &ldError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *ldError:
}
} else if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}:
}
}
continue
}
wgActiveBatches.Done()
atomic.AddInt64(rowsLoaded, int64(processedRows))
}
}
}

View File

@@ -0,0 +1 @@
package loaders

View File

@@ -0,0 +1,40 @@
package table_analyzers
import (
"context"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func PartitionRangeGenerator(
ctx context.Context,
tableAnalyzer etl.TableAnalyzer,
tableInfo config.TableInfo,
partitionColumn string,
rowsPerPartition int64,
) ([]models.Partition, error) {
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
if err != nil {
return nil, err
}
if rowsCount <= rowsPerPartition {
return []models.Partition{{
Id: uuid.New(),
HasRange: false,
RetryCounter: 0,
}}, nil
}
partitionsCount := rowsCount / rowsPerPartition
partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount)
if err != nil {
return nil, err
}
return partitions, nil
}

View File

@@ -0,0 +1,252 @@
package table_analyzers
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type MssqlTableAnalyzer struct {
db dbwrapper.DbWrapper
}
func NewMssqlTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db}
}
const mssqlColumnMetadataQuery string = `
SELECT
c.name AS name,
t.name AS user_type,
CASE WHEN t.is_user_defined = 0 THEN t.name ELSE bt.name END AS system_type,
c.is_nullable AS nullable,
c.max_length AS max_length,
c.precision AS precision,
c.scale AS scale
FROM sys.columns c
JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;`
type rawColumnMssql struct {
name string
userType string
systemType string
nullable bool
maxLength int64
precision int64
scale int64
}
func (ta *MssqlTableAnalyzer) systemTypeToUnifiedType(systemType string) string {
systemType = strings.ToLower(systemType)
if systemType == "varchar" || systemType == "char" || systemType == "nvarchar" || systemType == "nchar" || systemType == "text" || systemType == "ntext" {
return "STRING"
}
if systemType == "int" || systemType == "int4" || systemType == "integer" || systemType == "smallint" || systemType == "int2" || systemType == "bigint" || systemType == "int8" || systemType == "tinyint" {
return "INTEGER"
}
if systemType == "decimal" || systemType == "numeric" {
return "DECIMAL"
}
if systemType == "float" || systemType == "real" || systemType == "double precision" {
return "FLOAT"
}
if systemType == "bit" || systemType == "boolean" {
return "BOOLEAN"
}
if systemType == "date" {
return "DATE"
}
if systemType == "time" || systemType == "time without time zone" {
return "TIME"
}
if systemType == "datetime" || systemType == "datetime2" || systemType == "timestamp" || systemType == "timestamptz" || systemType == "timestamp with time zone" {
return "TIMESTAMP"
}
if systemType == "binary" || systemType == "varbinary" || systemType == "image" || systemType == "bytea" {
return "BINARY"
}
if systemType == "uniqueidentifier" || systemType == "uuid" {
return "UUID"
}
if systemType == "json" {
return "JSON"
}
if systemType == "geometry" || systemType == "geography" {
return "GEOMETRY"
}
return strings.ToUpper(systemType)
}
func (ta *MssqlTableAnalyzer) rawColumnToColumnType(rawColumn rawColumnMssql) models.ColumnType {
const nullValue int64 = -1
stringTypes := map[string]bool{"varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true}
decimalTypes := map[string]bool{"decimal": true, "numeric": true}
if stringTypes[rawColumn.systemType] {
if rawColumn.systemType == "nvarchar" || rawColumn.systemType == "nchar" {
if rawColumn.maxLength > 0 {
rawColumn.maxLength = rawColumn.maxLength / 2
}
}
rawColumn.precision, rawColumn.scale = nullValue, nullValue
} else if decimalTypes[rawColumn.systemType] {
rawColumn.maxLength = nullValue
} else {
rawColumn.maxLength, rawColumn.precision, rawColumn.scale = nullValue, nullValue, nullValue
}
columnType := models.NewColumnType(
rawColumn.name,
rawColumn.maxLength != nullValue,
rawColumn.precision != nullValue || rawColumn.scale != nullValue,
rawColumn.userType,
rawColumn.systemType,
ta.systemTypeToUnifiedType(rawColumn.systemType),
rawColumn.nullable,
rawColumn.maxLength,
rawColumn.precision,
rawColumn.scale,
)
return columnType
}
func (ta *MssqlTableAnalyzer) QueryColumnTypes(
ctx context.Context,
tableInfo config.TableInfo,
) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
rows, err := ta.db.Query(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil {
return nil, err
}
defer rows.Close()
var columnTypes []models.ColumnType
for rows.Next() {
var rawColumn rawColumnMssql
if err := rows.Scan(
&rawColumn.name,
&rawColumn.userType,
&rawColumn.systemType,
&rawColumn.nullable,
&rawColumn.maxLength,
&rawColumn.precision,
&rawColumn.scale,
); err != nil {
return nil, err
}
columnTypes = append(columnTypes, ta.rawColumnToColumnType(rawColumn))
}
return columnTypes, nil
}
func (ta *MssqlTableAnalyzer) EstimateTotalRows(
ctx context.Context,
tableInfo config.TableInfo,
) (int64, error) {
query := `
SELECT SUM(p.rows) AS count
FROM sys.tables t
JOIN sys.schemas s ON t.schema_id = s.schema_id
JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var rowsCount int64
err := ta.db.QueryRow(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil {
return 0, err
}
return rowsCount, nil
}
func (ta *MssqlTableAnalyzer) CalculatePartitionRanges(
ctx context.Context,
tableInfo config.TableInfo,
partitionColumn string,
maxPartitions int64,
) ([]models.Partition, error) {
query := fmt.Sprintf(`
SELECT
MIN([%s]) AS lower_limit,
MAX([%s]) AS upper_limit
FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T
GROUP BY batch_id
ORDER BY batch_id`,
partitionColumn,
partitionColumn,
partitionColumn,
partitionColumn,
tableInfo.Schema,
tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil {
return nil, err
}
defer rows.Close()
partitions := make([]models.Partition, 0, maxPartitions)
for rows.Next() {
partition := models.Partition{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
IsMinInclusive: true,
},
}
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
return nil, err
}
partitions = append(partitions, partition)
}
if err := rows.Err(); err != nil {
return nil, err
}
return partitions, nil
}

View File

@@ -0,0 +1,174 @@
package table_analyzers
import (
"context"
"strings"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type PostgresTableAnalyzer struct {
db dbwrapper.DbWrapper
}
func NewPostgresTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &PostgresTableAnalyzer{db: db}
}
const postgresColumnMetadataQuery string = `
SELECT
c.column_name AS name,
c.data_type AS user_type,
c.udt_name AS system_type,
(CASE WHEN c.is_nullable = 'YES' THEN TRUE ELSE FALSE END) AS nullable,
COALESCE(c.character_maximum_length, -1) AS max_length,
COALESCE(c.numeric_precision, -1) AS precision,
COALESCE(c.numeric_scale, -1) AS scale
FROM information_schema.columns c
WHERE c.table_schema = $1 AND c.table_name = $2
ORDER BY c.ordinal_position;`
type rawColumnPostgres struct {
name string
userType string
systemType string
nullable bool
maxLength int64
precision int64
scale int64
}
func (ta *PostgresTableAnalyzer) systemTypeToUnifiedType(systemType string) string {
systemType = strings.ToLower(systemType)
if systemType == "varchar" || systemType == "char" || systemType == "nvarchar" || systemType == "nchar" || systemType == "text" || systemType == "ntext" {
return "STRING"
}
if systemType == "int" || systemType == "int4" || systemType == "integer" || systemType == "smallint" || systemType == "int2" || systemType == "bigint" || systemType == "int8" || systemType == "tinyint" {
return "INTEGER"
}
if systemType == "decimal" || systemType == "numeric" {
return "DECIMAL"
}
if systemType == "float" || systemType == "real" || systemType == "double precision" {
return "FLOAT"
}
if systemType == "bit" || systemType == "boolean" {
return "BOOLEAN"
}
if systemType == "date" {
return "DATE"
}
if systemType == "time" || systemType == "time without time zone" {
return "TIME"
}
if systemType == "datetime" || systemType == "datetime2" || systemType == "timestamp" || systemType == "timestamptz" || systemType == "timestamp with time zone" {
return "TIMESTAMP"
}
if systemType == "binary" || systemType == "varbinary" || systemType == "image" || systemType == "bytea" {
return "BINARY"
}
if systemType == "uniqueidentifier" || systemType == "uuid" {
return "UUID"
}
if systemType == "json" {
return "JSON"
}
if systemType == "geometry" || systemType == "geography" {
return "GEOMETRY"
}
return strings.ToUpper(systemType)
}
func (ta *PostgresTableAnalyzer) rawColumnToColumnType(rawColumn rawColumnPostgres) models.ColumnType {
const nullValue int64 = -1
stringTypes := map[string]bool{"varchar": true, "char": true, "text": true}
decimalTypes := map[string]bool{"decimal": true, "numeric": true}
if stringTypes[rawColumn.systemType] {
rawColumn.precision, rawColumn.scale = nullValue, nullValue
} else if decimalTypes[rawColumn.systemType] {
rawColumn.maxLength = nullValue
} else {
rawColumn.maxLength, rawColumn.precision, rawColumn.scale = nullValue, nullValue, nullValue
}
return models.NewColumnType(
rawColumn.name,
rawColumn.maxLength != nullValue,
rawColumn.precision != nullValue || rawColumn.scale != nullValue,
rawColumn.userType,
rawColumn.systemType,
ta.systemTypeToUnifiedType(rawColumn.systemType),
rawColumn.nullable,
rawColumn.maxLength,
rawColumn.precision,
rawColumn.scale,
)
}
func (ta *PostgresTableAnalyzer) QueryColumnTypes(
ctx context.Context,
tableInfo config.TableInfo,
) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table)
if err != nil {
return nil, err
}
defer rows.Close()
var colTypes []models.ColumnType
for rows.Next() {
var column rawColumnPostgres
if err := rows.Scan(
&column.name,
&column.userType,
&column.systemType,
&column.nullable,
&column.maxLength,
&column.precision,
&column.scale,
); err != nil {
return nil, err
}
colTypes = append(colTypes, ta.rawColumnToColumnType(column))
}
return colTypes, nil
}
func (ta *PostgresTableAnalyzer) EstimateTotalRows(
ctx context.Context,
tableInfo config.TableInfo,
) (int64, error) {
return 0, nil
}
func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
ctx context.Context,
tableInfo config.TableInfo,
partitionColumn string,
maxPartitions int64,
) ([]models.Partition, error) {
return []models.Partition{}, nil
}

View File

@@ -0,0 +1,154 @@
package transformers
import (
"context"
"errors"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type MssqlTransformer struct{}
func NewMssqlTransformer() etl.Transformer {
return &MssqlTransformer{}
}
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
const processBatchCtxCheck = 4096
func (mssqlTr *MssqlTransformer) ProcessBatch(
ctx context.Context,
batch *models.Batch,
transformationPlan []etl.ColumnTransformPlan,
) error {
for i, rowValues := range batch.Rows {
if i%processBatchCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
if rowValues == nil {
continue
}
for _, task := range transformationPlan {
val := rowValues[task.Index]
if val == nil {
continue
}
transformed, err := task.Fn(val)
if err != nil {
return err
}
rowValues[task.Index] = transformed
}
}
return nil
}
func (mssqlTr *MssqlTransformer) Exec(
ctx context.Context,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chBatchesOut <- batch:
wgActiveBatches.Add(1)
continue
case <-ctx.Done():
return
}
}
err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
select {
case chBatchesOut <- batch:
case <-ctx.Done():
return
}
wgActiveBatches.Add(1)
}
}
}

View File

@@ -0,0 +1 @@
package transformers

View File

@@ -1,28 +1,30 @@
package main
package transformers
import (
"encoding/binary"
"errors"
"time"
)
func mssqlUuidToBigEndian(mssqlUuid []byte) []byte {
func mssqlUuidToBigEndian(mssqlUuid []byte) ([]byte, error) {
if len(mssqlUuid) != 16 {
return mssqlUuid
return nil, errors.New("Invalid uuid")
}
pgUuid := make([]byte, 16)
pgUuid[0], pgUuid[1], pgUuid[2], pgUuid[3] = mssqlUuid[3], mssqlUuid[2], mssqlUuid[1], mssqlUuid[0]
pgUuid[4], pgUuid[5] = mssqlUuid[5], mssqlUuid[4]
pgUuid[6], pgUuid[7] = mssqlUuid[7], mssqlUuid[6]
copy(pgUuid[8:], mssqlUuid[8:])
return pgUuid
return pgUuid, nil
}
const sridFlag = 0x20000000
func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
func wkbToEwkbWithSrid(geometry []byte, srid int) ([]byte, error) {
if len(geometry) < 5 {
return geometry
return nil, errors.New("Invalid wkb")
}
var byteOrder binary.ByteOrder
@@ -34,7 +36,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
wkbType := byteOrder.Uint32(geometry[1:5])
if wkbType&sridFlag != 0 {
return geometry
return geometry, nil
}
ewkbType := wkbType | sridFlag
@@ -49,7 +51,7 @@ func wkbToEwkbWithSrid(geometry []byte, srid int) []byte {
copy(result[9:], geometry[5:])
return result
return result, nil
}
func ensureUTC(t time.Time) time.Time {
@@ -59,3 +61,20 @@ func ensureUTC(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
}
}

85
internal/app/etl/types.go Normal file
View File

@@ -0,0 +1,85 @@
package etl
import (
"context"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type Extractor interface {
Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error)
}
type TransformerFunc func(any) (any, error)
type ColumnTransformPlan struct {
Index int
Fn TransformerFunc
}
type Transformer interface {
ProcessBatch(
ctx context.Context,
batch *models.Batch,
transformationPlan []ColumnTransformPlan,
) error
Exec(
ctx context.Context,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chBactchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
)
}
type Loader interface {
ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error)
Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64,
)
}
type TableAnalyzer interface {
QueryColumnTypes(
ctx context.Context,
tableInfo config.TableInfo,
) ([]models.ColumnType, error)
EstimateTotalRows(
ctx context.Context,
tableInfo config.TableInfo,
) (int64, error)
CalculatePartitionRanges(
ctx context.Context,
tableInfo config.TableInfo,
partitionColumn string,
maxPartitions int64,
) ([]models.Partition, error)
}

View File

@@ -1,4 +1,4 @@
package main
package models
type ColumnType struct {
name string
@@ -42,3 +42,29 @@ func (c *ColumnType) Nullable() bool {
func (c *ColumnType) Type() string {
return c.unifiedType
}
func NewColumnType(
name string,
hasMaxLength bool,
hasPrecisionScale bool,
userType string,
systemType string,
unifiedType string,
nullable bool,
maxLength int64,
precision int64,
scale int64,
) ColumnType {
return ColumnType{
name,
hasMaxLength,
hasPrecisionScale,
userType,
systemType,
unifiedType,
nullable,
maxLength,
precision,
scale,
}
}

View File

@@ -0,0 +1,27 @@
package models
import "github.com/google/uuid"
type UnknownRowValues = []any
type Batch struct {
Id uuid.UUID
PartitionId uuid.UUID
Rows []UnknownRowValues
RetryCounter int
}
type PartitionRange struct {
Min int64
Max int64
IsMinInclusive bool
IsMaxInclusive bool
}
type Partition struct {
Id uuid.UUID
ParentId uuid.UUID
Range PartitionRange
HasRange bool
RetryCounter int
}

View File

@@ -0,0 +1,17 @@
package main
import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
log "github.com/sirupsen/logrus"
)
func main() {
log.SetLevel(log.DebugLevel)
migrationConfig, err := config.ReadMigrationConfig()
if err != nil {
log.Fatalf("error leyendo configuracion: %v", err)
}
log.Debugf("Config: %+v", migrationConfig)
}

View File

@@ -8,13 +8,32 @@ import (
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
func main() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
@@ -27,8 +46,8 @@ func main() {
ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
sourcePool, err := db.Connect(ctxSource, config.App.SourceDbUrl)
defer db.Close(sourcePool)
sourcePool, err := Connect(ctxSource, config.App.SourceDbUrl)
defer Close(sourcePool)
if err != nil {
log.Fatal(err)
}
@@ -37,8 +56,8 @@ func main() {
ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
targetPool, err := db.Connect(ctxTarget, config.App.TargetDbUrl)
defer db.Close(targetPool)
targetPool, err := Connect(ctxTarget, config.App.TargetDbUrl)
defer Close(targetPool)
if err != nil {
log.Fatal(err)
}