58 Commits

Author SHA1 Message Date
6ad25e5889 feat: update job configuration for infraestructura_site_holder__attach; enhance storage handling and retry logic 2026-04-22 15:23:19 -05:00
0ac5f01b65 feat: update configuration handling to use cleanenv; remove unused dependencies and improve error logging 2026-04-22 12:31:31 -05:00
3b1371a270 feat: integrate Azure storage handling in migration process; update transformers and job processing logic 2026-04-21 14:15:20 -05:00
bb7b35619a feat: add ToStorage configuration to JobConfig for enhanced data handling 2026-04-21 13:43:03 -05:00
cd2efb8692 feat: add new job configuration for red_terminal__attach with source and target definitions 2026-04-21 13:38:11 -05:00
9964ef819b feat: refactor migration job to use preSQL and postSQL from TargetTable; update config structure for job definitions 2026-04-21 12:30:40 -05:00
bd51223855 feat: add truncate query handling in migration process; update job configuration to remove commented truncate SQL 2026-04-21 11:40:02 -05:00
aa71eeb5c1 feat: enhance range handling in MSSQL and Postgres extractors; update partition range generator logic 2026-04-21 11:32:52 -05:00
9eb8800864 feat: add range configuration to job and update extractors for inclusive range handling 2026-04-21 11:29:34 -05:00
09bd364976 feat: implement Azure Blob Storage client and refactor configuration structure 2026-04-21 11:03:56 -05:00
f2e6edd8fa feat: add caarlos0/env package for environment variable management and refactor appConfig structure 2026-04-21 10:41:38 -05:00
c5a18f6d95 chore: update .env.example to set LOG_LEVEL to INFO and add Azure storage configuration variables 2026-04-21 10:15:01 -05:00
8217b13d08 feat: add azblob client implementation for file uploads 2026-04-21 10:01:42 -05:00
16c232762e feat: refactor extractor interface and implement Consume function for ETL process 2026-04-20 00:19:41 -05:00
b4a846575d Update .env.example with new variables 2026-04-19 23:08:09 -05:00
5bd730f026 feat: update .gitignore to include .vscode directory 2026-04-19 23:06:18 -05:00
7bd80d4180 feat: enhance logging configuration to use dynamic log level from environment variable 2026-04-19 23:06:13 -05:00
846a49d40c feat: implement GenericLoader for batch processing and utility functions 2026-04-17 15:58:08 -05:00
93b302db8e feat: refactor job result handling and remove unused files 2026-04-16 16:47:35 -05:00
39c0d99502 feat: extend context timeout to 1 minute for database queries in mssql.go and postgres.go 2026-04-16 13:19:46 -05:00
b418ded78b feat: update SQL query to filter out specific column names in mssql.go 2026-04-16 13:14:39 -05:00
0d0511716f feat: add row count mismatch error handling in processMigrationJob and update SQL query to exclude additional graph-related columns 2026-04-16 12:46:55 -05:00
67fb0148ae feat: add Makefile for building binaries across platforms 2026-04-16 11:51:05 -05:00
098cf36e3c feat: comment out TRUNCATE statements in pre_sql for MANZANA and PUERTO jobs 2026-04-16 11:46:24 -05:00
5484716b81 feat: add source and target database type fields to MigrationConfig 2026-04-16 09:08:21 -05:00
df4c3bc390 feat: refactor db handling to use db-wrapper package; enhance connection management and result handling for MSSQL and Postgres 2026-04-16 08:48:29 -05:00
ea41a7c218 feat: register MSSQL and Postgres drivers in db-wrapper for improved factory pattern support 2026-04-15 23:09:56 -05:00
f09284ecdc feat: enhance db-wrapper with improved MSSQL and Postgres implementations; add row result handling and dialect support 2026-04-15 22:55:14 -05:00
0384d5423f feat: add range configuration to job settings for enhanced data processing control 2026-04-15 20:23:45 -05:00
1ce3d9e153 refactor: update partition handling to use Range struct for better clarity and consistency 2026-04-15 20:23:45 -05:00
DiegoAlessandroMotta
ed889b740a add db-wrapper package types 2026-04-15 20:22:23 -05:00
803f8988b8 refactor: update extractor interfaces to return row counts instead of using pointers for rows read 2026-04-13 19:25:18 -05:00
33c9cd9c3e feat: implement database wrapper interfaces for MSSQL and Postgres; enhance migration job processing with pre and post SQL execution 2026-04-13 07:57:18 -05:00
85074da2ec feat: add max partition and chunk error limits to extractor and loader error handlers 2026-04-12 20:57:31 -05:00
f126d5bbd0 feat: implement exponential backoff strategy for error handling in extractor and loader processes; enhance retry configuration options 2026-04-12 20:35:29 -05:00
5633dc98d0 fix: enhance error handling in extractor and loader processes; ensure proper job error propagation and logging 2026-04-12 20:11:19 -05:00
01780b4b02 refactor: remove unused ColumnType and inspect-columns files; update migration job to use separate table analyzers for source and target databases 2026-04-12 19:16:14 -05:00
aded502ee4 feat: implement Postgres table analyzer with column type querying and metadata retrieval 2026-04-12 13:38:36 -05:00
4d3cd6e4cf feat: add MSSQL table analyzer and integrate partition range generation for improved data migration 2026-04-11 01:23:13 -05:00
7830ae862d refactor: rename batch-related variables and functions for consistency and clarity 2026-04-11 00:44:12 -05:00
955bc65ce9 refactor: rename Batch to Partition in error handling and processing functions for consistency 2026-04-11 00:32:50 -05:00
9eb9821daf refactor: rename Batch to Partition and update related types and channels for consistency 2026-04-11 00:09:28 -05:00
cd0e53b1d2 feat: implement MSSQL extractor, transformer, and Postgres loader for enhanced data migration 2026-04-10 23:39:37 -05:00
1be7018ba3 feat: refactor configuration to include source and target database types 2026-04-10 22:58:57 -05:00
a5b5a04feb feat: update extractor and transformer constructors to return Extractor interface 2026-04-10 20:42:16 -05:00
c1bae79f98 feat: implement Postgres loader and refactor migration job processing 2026-04-10 20:40:01 -05:00
053e6bd673 feat: add MSSQL extractor and transformer implementations for improved data migration 2026-04-10 19:59:44 -05:00
eb3c3bbfce feat: refactor error handling to use custom_errors.LoaderError for improved error management 2026-04-10 19:35:38 -05:00
9493a2d32f feat: refactor error handling to accept max retry attempts as a parameter for improved flexibility 2026-04-10 19:32:12 -05:00
d228a048b8 feat: update extractor error handling to use models.UnknownRowValues for improved type consistency 2026-04-10 19:29:07 -05:00
ca621352c9 feat: refactor models to improve type handling and enhance error management across migration processes 2026-04-10 19:27:27 -05:00
c2ea84bfcf feat: implement extractor error handling and batch processing for MSSQL and Postgres 2026-04-10 19:06:41 -05:00
6345a0d694 feat: enhance migration job processing with detailed metrics and error handling 2026-04-09 21:55:19 -05:00
1db35c796c feat: enhance migration job processing with parallel execution and improved logging 2026-04-09 20:02:04 -05:00
0d9f955b2f feat: enhance batch processing by adding rowsPerBatch parameter and improving logging messages 2026-04-09 19:46:45 -05:00
524d892a60 feat: refactor migration job structure to use SourceTableInfo and TargetTableInfo for improved configuration handling 2026-04-09 19:20:50 -05:00
e8ace6ecf9 feat: implement job configuration structure and YAML parsing for migration jobs 2026-04-09 18:12:11 -05:00
adbc962464 feat: add configuration parsing and job management in YAML format 2026-04-09 17:47:58 -05:00
49 changed files with 3059 additions and 1477 deletions

24
.atl/skill-registry.md Normal file
View File

@@ -0,0 +1,24 @@
# Skill Registry — go-migrate
Generated: 2026-04-21
## Compact Rules
### Go conventions
- Use existing error wrapping pattern: `fmt.Errorf("context: %w", err)`
- Channel-based pipeline — keep goroutine lifecycle clean (close channels in correct order)
- No comments unless non-obvious WHY; no docstrings
- Prefer named returns only when it aids clarity in short functions
- Use `strings.EqualFold` for case-insensitive column name comparison
### Project conventions
- Config structs live in `internal/app/config/`
- ETL interfaces live in `internal/app/etl/types.go`
- Transformer implementations in `internal/app/etl/transformers/`
- Azure operations via `internal/app/azure/main.go`
- Per-job transformer creation (not shared) when job has storage config
## User Skills
| Trigger | Skill |
|---------|-------|
| sdd-* | SDD workflow skills |

View File

@@ -1,2 +1,12 @@
PG_FROM_DB_URL=postgresql://postgres:password@localhost:5432/db
PG_TO_DB_URL=postgresql://postgres:password@localhost:5432/db
SOURCE_DB_URL=sqlserver://sa:password@localhost:1433?database=master&packet+size=32767&loc=UTC
TARGET_DB_URL=postgresql://postgres:password@localhost:5432/db
LOG_LEVEL=INFO
AZ_STORAGE_ENABLED=false
AZ_ACCOUNT_NAME=
AZ_CONTAINER=
AZ_ACCOUNT_KEY=
AZ_USE_HTTPS=true
AZ_SERVICE_URL=
AZ_PREFIX=

3
.gitignore vendored
View File

@@ -4,6 +4,7 @@
*.dll
*.so
*.dylib
bin/
# Test binary, built with `go test -c`
*.test
@@ -26,5 +27,5 @@ go.work.sum
# Editor/IDE
# .idea/
# .vscode/
.vscode/
.temp

80
Makefile Normal file
View File

@@ -0,0 +1,80 @@
.PHONY: build build-linux build-windows build-all clean help
# Variables
BINARY_NAME=go-migrate
CMD_PATH=./cmd/go_migrate
OUTPUT_DIR=bin
VERSION?=$(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
GIT_COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Flags de compilación
LD_FLAGS=-ldflags="-s -w -X main.Version=$(VERSION) -X main.BuildTime=$(BUILD_TIME) -X main.GitCommit=$(GIT_COMMIT)"
# Default: compilar para el SO actual
build: build-$(OS)
ifeq ($(OS),Windows_NT)
build-native: build-windows
else
build-native: build-linux
endif
# Compilar para Linux (sin CGO para máxima compatibilidad)
build-linux:
@echo "Compilando para Linux..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64"
# Compilar para Windows
build-windows:
@echo "Compilando para Windows..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe"
# Compilar para ambas plataformas
build-all: build-linux build-windows
@echo ""
@echo "Binarios compilados:"
@ls -lh $(OUTPUT_DIR)/$(BINARY_NAME)*
# Compilar para Linux arm64 (opcional, para Raspberry Pi, etc.)
build-linux-arm64:
@echo "Compilando para Linux ARM64..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64"
# Limpiar binarios
clean:
@echo "Limpiando binarios..."
@rm -rf $(OUTPUT_DIR)
@echo "Limpieza completada"
# Ayuda
help:
@echo "Comandos disponibles:"
@echo ""
@echo " make build - Compilar para el SO actual (Linux/Windows)"
@echo " make build-linux - Compilar para Linux x86_64"
@echo " make build-windows - Compilar para Windows x86_64"
@echo " make build-linux-arm64 - Compilar para Linux ARM64 (opcional)"
@echo " make build-all - Compilar para Linux y Windows"
@echo " make clean - Eliminar binarios compilados"
@echo " make help - Mostrar esta ayuda"
@echo ""
@echo "Ejemplos de uso:"
@echo " make build-all # Crear binarios para ambas plataformas"
@echo " make build-linux OS= # Crear solo para Linux"
@echo ""

View File

@@ -1,110 +0,0 @@
package main
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/google/uuid"
)
type Batch struct {
Id uuid.UUID
ParentId uuid.UUID
LowerLimit int64
UpperLimit int64
IsLowerLimitInclusive bool
ShouldUseRange bool
RetryCounter int
}
func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, job MigrationJob) (int64, error) {
query := `
SELECT
SUM(p.rows) AS count
FROM sys.tables t
JOIN sys.schemas s ON t.schema_id = s.schema_id
JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
var rowsCount int64
err := db.QueryRowContext(ctxTimeout, query, sql.Named("schema", job.Schema), sql.Named("table", job.Table)).Scan(&rowsCount)
if err != nil {
return 0, err
}
return rowsCount, nil
}
func calculateBatchesMssql(ctx context.Context, db *sql.DB, job MigrationJob, batchCount int64) ([]Batch, error) {
query := fmt.Sprintf(`
SELECT
MIN([%s]) AS lower_limit,
MAX([%s]) AS upper_limit
FROM
(SELECT [%s], NTILE(@batchCount) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T
GROUP BY batch_id
ORDER BY batch_id`, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.PrimaryKey, job.Schema, job.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
rows, err := db.QueryContext(ctxTimeout, query, sql.Named("batchCount", batchCount))
if err != nil {
return nil, err
}
defer rows.Close()
batches := make([]Batch, 0, batchCount)
for rows.Next() {
batch := Batch{
Id: uuid.New(),
ShouldUseRange: true,
RetryCounter: 0,
IsLowerLimitInclusive: true,
}
if err := rows.Scan(&batch.LowerLimit, &batch.UpperLimit); err != nil {
return nil, err
}
batches = append(batches, batch)
}
if err := rows.Err(); err != nil {
return nil, err
}
return batches, nil
}
func batchGeneratorMssql(ctx context.Context, db *sql.DB, job MigrationJob) ([]Batch, error) {
rowsCount, err := estimateTotalRowsMssql(ctx, db, job)
if err != nil {
return nil, err
}
var batchCount int64 = 1
if rowsCount > RowsPerBatch {
batchCount = rowsCount / RowsPerBatch
} else {
return []Batch{{
Id: uuid.New(),
ShouldUseRange: false,
RetryCounter: 0,
}}, nil
}
batches, err := calculateBatchesMssql(ctx, db, job, batchCount)
if err != nil {
return nil, err
}
return batches, nil
}

View File

@@ -1,73 +0,0 @@
package main
import (
"fmt"
"strings"
)
func buildExtractQueryMssql(job MigrationJob, columns []ColumnType, includeRange bool, isMinInclusive bool) string {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.name)
if col.unifiedType == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.name)
}
if i < len(columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", job.Schema, job.Table)
if includeRange {
fmt.Fprintf(&sbQuery, " WHERE [%s]", job.PrimaryKey)
if isMinInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
fmt.Fprintf(&sbQuery, " @min AND [%s] <= @max", job.PrimaryKey)
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", job.PrimaryKey)
return sbQuery.String()
}
func buildExtractQueryPostgres(job MigrationJob, columns []ColumnType) string {
var sbColumns strings.Builder
if len(columns) == 0 {
sbColumns.WriteString("*")
} else {
for i, col := range columns {
if col.unifiedType == "GEOMETRY" {
sbColumns.WriteString(`ST_AsEWKB("`)
sbColumns.WriteString(col.name)
sbColumns.WriteString(`") AS "`)
sbColumns.WriteString(col.name)
sbColumns.WriteString(`"`)
} else {
sbColumns.WriteString(`"`)
sbColumns.WriteString(col.name)
sbColumns.WriteString(`"`)
}
if i < len(columns)-1 {
sbColumns.WriteString(", ")
}
}
}
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), job.Schema, job.Table, job.PrimaryKey)
}

View File

@@ -1,77 +0,0 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func connectToSqlServer() (*sql.DB, error) {
db, err := sql.Open("sqlserver", config.App.SourceDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to sqlserver: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("Unable to ping sqlserver: %w", err)
}
return db, nil
}
func connectToPostgres() (*pgxpool.Pool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
pool, err := pgxpool.New(ctx, config.App.TargetDbUrl)
if err != nil {
return nil, fmt.Errorf("Unable to connect to postgres: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("Unable to ping postgres: %w", err)
}
return pool, nil
}
func connectToDatabases() (*sql.DB, *pgxpool.Pool, error) {
var sourceDbErr, targetDbErr error
var sourceDb *sql.DB
var targetDb *pgxpool.Pool
var wg sync.WaitGroup
wg.Go(func() {
sourceDb, sourceDbErr = connectToSqlServer()
if sourceDbErr != nil {
log.Error("Unable to connect to source db: ", sourceDbErr)
}
})
wg.Go(func() {
targetDb, targetDbErr = connectToPostgres()
if targetDbErr != nil {
log.Error("Unable to connect to target db: ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Unable to connect to databases")
}
return sourceDb, targetDb, nil
}

View File

@@ -1,102 +0,0 @@
package main
import (
"context"
"fmt"
"sync"
"github.com/google/uuid"
)
type ExtractorError struct {
Batch
LastId int64
HasLastId bool
Msg string
}
func (e *ExtractorError) Error() string {
return e.Msg
}
const maxRetryAttempts = 3
func extractorErrorHandler(
ctx context.Context,
chErrorsIn <-chan ExtractorError,
chBatchesOut chan<- Batch,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.RetryCounter >= maxRetryAttempts {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
wgActiveBatches.Done()
continue
}
newBatch := err.Batch
newBatch.RetryCounter++
if err.HasLastId {
newBatch.ParentId = err.Id
newBatch.Id = uuid.New()
newBatch.LowerLimit = err.LastId
newBatch.IsLowerLimitInclusive = false
}
select {
case chBatchesOut <- newBatch:
case <-ctx.Done():
return
}
}
}
}
func ExtractorErrorFromLastRowMssql(lastRow UnknownRowValues, indexPrimaryKey int, batch *Batch, previousError error) ExtractorError {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := ToInt64(lastIdRawValue)
if !ok {
currentBatch := *batch
currentBatch.RetryCounter = maxRetryAttempts
return ExtractorError{
Batch: currentBatch,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return ExtractorError{
Batch: *batch,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}

View File

@@ -1,242 +0,0 @@
package main
import (
"context"
"database/sql"
"errors"
"slices"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
type UnknownRowValues = []any
type Chunk struct {
Id uuid.UUID
BatchId uuid.UUID
Data []UnknownRowValues
RetryCounter int
}
func extractFromMssql(
ctx context.Context,
db *sql.DB,
job MigrationJob,
columns []ColumnType,
chunkSize int,
chBatchesIn <-chan Batch,
chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
return strings.EqualFold(col.name, job.PrimaryKey)
})
if indexPrimaryKey == -1 {
jobError := JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}
select {
case <-ctx.Done():
return
case chJobErrorsOut <- jobError:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
if abort := processBatch(ctx, db, job, columns, chunkSize, batch, indexPrimaryKey, chChunksOut, chErrorsOut, wgActiveBatches); abort {
return
}
}
}
}
func processBatch(
ctx context.Context,
db *sql.DB,
job MigrationJob,
columns []ColumnType,
chunkSize int,
batch Batch,
indexPrimaryKey int,
chChunksOut chan<- Chunk,
chErrorsOut chan<- ExtractorError,
wgActiveBatches *sync.WaitGroup,
) (abort bool) {
query := buildExtractQueryMssql(job, columns, batch.ShouldUseRange, batch.IsLowerLimitInclusive)
log.Debug("Query used to extract data from mssql: ", query)
var queryArgs []any
if batch.ShouldUseRange {
queryArgs = append(queryArgs,
sql.Named("min", batch.LowerLimit),
sql.Named("max", batch.UpperLimit),
)
}
queryStartTime := time.Now()
rows, err := db.QueryContext(ctx, query, queryArgs...)
if err != nil {
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
return false
}
defer rows.Close()
log.Debugf("Query executed in %v", time.Since(queryStartTime))
rowsChunk := make([]UnknownRowValues, 0, chunkSize)
totalRowsExtracted := 0
chunkStartTime := time.Now()
for rows.Next() {
values := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(rowsChunk) == 0 {
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
return false
}
lastRow := rowsChunk[len(rowsChunk)-1]
select {
case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err):
case <-ctx.Done():
return true
}
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
return false
}
rowsChunk = append(rowsChunk, values)
totalRowsExtracted++
if len(rowsChunk) >= chunkSize {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(chunkSize) / chunkDuration.Seconds()
log.Infof("Extracted chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
chunkStartTime = time.Now()
}
}
if err := rows.Err(); err != nil {
if errors.Is(err, ctx.Err()) {
return true
}
if len(rowsChunk) == 0 {
select {
case chErrorsOut <- ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}:
case <-ctx.Done():
return true
}
return false
}
lastRow := rowsChunk[len(rowsChunk)-1]
select {
case chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err):
case <-ctx.Done():
return true
}
return false
}
if len(rowsChunk) > 0 {
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rowsChunk)) / chunkDuration.Seconds()
log.Infof("Extracted final chunk: %d rows in %v (%.0f rows/sec) - Total: %d rows", len(rowsChunk), chunkDuration, rowsPerSec, totalRowsExtracted)
select {
case chChunksOut <- Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
case <-ctx.Done():
return true
}
}
wgActiveBatches.Done()
return false
}
func extractFromPostgres(ctx context.Context, job MigrationJob, columns []ColumnType, chunkSize int, db *pgxpool.Pool, out chan<- []UnknownRowValues) error {
query := buildExtractQueryPostgres(job, columns)
log.Debug("Query used to extract data from postgres: ", query)
rows, err := db.Query(ctx, query)
if err != nil {
return err
}
defer rows.Close()
rowsChunk := make([]UnknownRowValues, 0, chunkSize)
for rows.Next() {
values, err := rows.Values()
if err != nil {
return err
}
rowsChunk = append(rowsChunk, values)
if len(rowsChunk) >= chunkSize {
out <- rowsChunk
rowsChunk = make([]UnknownRowValues, 0, chunkSize)
log.Infof("Chunk send... %+v", job)
}
}
if len(rowsChunk) > 0 {
out <- rowsChunk
log.Infof("Chunk send... %+v", job)
}
return nil
}

View File

@@ -1,283 +0,0 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func GetUnifiedType(systemType string) string {
systemType = strings.ToLower(systemType)
if systemType == "varchar" || systemType == "char" || systemType == "nvarchar" || systemType == "nchar" || systemType == "text" || systemType == "ntext" {
return "STRING"
}
if systemType == "int" || systemType == "int4" || systemType == "integer" || systemType == "smallint" || systemType == "int2" || systemType == "bigint" || systemType == "int8" || systemType == "tinyint" {
return "INTEGER"
}
if systemType == "decimal" || systemType == "numeric" {
return "DECIMAL"
}
if systemType == "float" || systemType == "real" || systemType == "double precision" {
return "FLOAT"
}
if systemType == "bit" || systemType == "boolean" {
return "BOOLEAN"
}
if systemType == "date" {
return "DATE"
}
if systemType == "time" || systemType == "time without time zone" {
return "TIME"
}
if systemType == "datetime" || systemType == "datetime2" || systemType == "timestamp" || systemType == "timestamptz" || systemType == "timestamp with time zone" {
return "TIMESTAMP"
}
if systemType == "binary" || systemType == "varbinary" || systemType == "image" || systemType == "bytea" {
return "BINARY"
}
if systemType == "uniqueidentifier" || systemType == "uuid" {
return "UUID"
}
if systemType == "json" {
return "JSON"
}
if systemType == "geometry" || systemType == "geography" {
return "GEOMETRY"
}
return strings.ToUpper(systemType)
}
func MapPostgresColumn(column ColumnType, maxLength *int64, precision *int64, scale *int64) ColumnType {
stringTypes := map[string]bool{
"varchar": true, "char": true, "character": true, "text": true, "character varying": true,
}
decimalTypes := map[string]bool{
"decimal": true, "numeric": true,
}
if stringTypes[column.systemType] {
if maxLength != nil {
column.maxLength = *maxLength
column.hasMaxLength = true
} else {
column.maxLength = -1
column.hasMaxLength = false
}
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
} else if decimalTypes[column.systemType] {
column.hasMaxLength = false
column.maxLength = -1
if precision != nil && scale != nil {
column.precision = *precision
column.scale = *scale
column.hasPrecisionScale = true
} else {
column.precision = -1
column.scale = -1
column.hasPrecisionScale = false
}
} else {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
}
column.unifiedType = GetUnifiedType(column.systemType)
return column
}
func GetColumnTypesPostgres(db *pgxpool.Pool, migrationJob MigrationJob) ([]ColumnType, error) {
query := `
SELECT
c.column_name AS name,
c.data_type AS user_type,
c.udt_name AS system_type,
(CASE WHEN c.is_nullable = 'YES' THEN TRUE ELSE FALSE END) AS nullable,
c.character_maximum_length AS max_length,
c.numeric_precision AS precision,
c.numeric_scale AS scale
FROM information_schema.columns c
WHERE c.table_schema = $1 AND c.table_name = $2
ORDER BY c.ordinal_position;
`
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
rows, err := db.Query(ctx, query, migrationJob.Schema, migrationJob.Table)
if err != nil {
return nil, fmt.Errorf("Error querying column types: %w", err)
}
defer rows.Close()
var colTypes []ColumnType
for rows.Next() {
var column ColumnType
var scanMaxLength *int64
var scanPrecision *int64
var scanScale *int64
if err := rows.Scan(
&column.name,
&column.userType,
&column.systemType,
&column.nullable,
&scanMaxLength,
&scanPrecision,
&scanScale,
); err != nil {
return nil, fmt.Errorf("Error scanning column type results: %w", err)
}
colTypes = append(colTypes, MapPostgresColumn(column, scanMaxLength, scanPrecision, scanScale))
}
return colTypes, nil
}
func MapMssqlColumn(column ColumnType) ColumnType {
stringTypes := map[string]bool{
"varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true,
}
decimalTypes := map[string]bool{
"decimal": true, "numeric": true,
}
if stringTypes[column.systemType] {
column.hasMaxLength = true
if column.systemType == "nvarchar" || column.systemType == "nchar" {
if column.maxLength > 0 {
column.maxLength = column.maxLength / 2
}
}
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
} else if decimalTypes[column.systemType] {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = true
} else {
column.hasMaxLength = false
column.maxLength = -1
column.hasPrecisionScale = false
column.precision = -1
column.scale = -1
}
column.unifiedType = GetUnifiedType(column.systemType)
return column
}
func GetColumnTypesMssql(db *sql.DB, migrationJob MigrationJob) ([]ColumnType, error) {
query := `
SELECT
c.name AS name,
t.name AS user_type,
CASE WHEN t.is_user_defined = 0 THEN t.name ELSE bt.name END AS system_type,
c.is_nullable AS nullable,
c.max_length AS max_length,
c.precision AS precision,
c.scale AS scale
FROM sys.columns c
JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table
ORDER BY c.column_id;
`
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
rows, err := db.QueryContext(ctx, query, sql.Named("schema", migrationJob.Schema), sql.Named("table", migrationJob.Table))
if err != nil {
return nil, fmt.Errorf("Error querying column types: %w", err)
}
defer rows.Close()
var colTypes []ColumnType
for rows.Next() {
var column ColumnType
if err := rows.Scan(
&column.name,
&column.userType,
&column.systemType,
&column.nullable,
&column.maxLength,
&column.precision,
&column.scale,
); err != nil {
return nil, fmt.Errorf("Error scanning column type results: %W", err)
}
if strings.HasPrefix(column.name, "graph_id") && column.systemType == "bigint" {
continue
}
colTypes = append(colTypes, MapMssqlColumn(column))
}
return colTypes, nil
}
func GetColumnTypes(sourceDb *sql.DB, targetDb *pgxpool.Pool, migrationJob MigrationJob) ([]ColumnType, []ColumnType, error) {
var sourceDbErr error
var targetDbErr error
var sourceColTypes []ColumnType
var targetColTypes []ColumnType
var wg sync.WaitGroup
wg.Go(func() {
sourceColTypes, sourceDbErr = GetColumnTypesMssql(sourceDb, migrationJob)
if sourceDbErr != nil {
log.Error("Error (sourceDb): ", sourceDbErr)
}
})
wg.Go(func() {
targetColTypes, targetDbErr = GetColumnTypesPostgres(targetDb, migrationJob)
if targetDbErr != nil {
log.Error("Error (targetDb): ", targetDbErr)
}
})
wg.Wait()
if sourceDbErr != nil || targetDbErr != nil {
return nil, nil, errors.New("Error querying column types")
}
return sourceColTypes, targetColTypes, nil
}

View File

@@ -1,65 +0,0 @@
package main
import (
"context"
"fmt"
"sync"
)
type LoaderError struct {
Chunk
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}
func loaderErrorHandler(
ctx context.Context,
chErrorsIn <-chan LoaderError,
chChunksOut chan<- Chunk,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.RetryCounter >= maxRetryAttempts {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("chunk %v reached max retries (%d)", err.Id, maxRetryAttempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
wgActiveChunks.Done()
continue
}
err.RetryCounter++
select {
case chChunksOut <- err.Chunk:
case <-ctx.Done():
return
}
}
}
}

View File

@@ -1,191 +0,0 @@
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
mssql "github.com/microsoft/go-mssqldb"
log "github.com/sirupsen/logrus"
)
func loadRowsPostgres(
ctx context.Context,
db *pgxpool.Pool,
job MigrationJob,
columns []ColumnType,
chChunksIn <-chan Chunk,
chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) {
tableId := pgx.Identifier{job.Schema, job.Table}
colNames := Map(columns, func(col ColumnType) string {
return col.name
})
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case chunk, ok := <-chChunksIn:
if !ok {
return
}
if abort := loadChunkPostgres(ctx, db, tableId, colNames, chunk, chErrorsOut, chJobErrorsOut, wgActiveChunks); abort {
return
}
}
}
}
func loadChunkPostgres(
ctx context.Context,
db *pgxpool.Pool,
identifier pgx.Identifier,
colNames []string,
chunk Chunk,
chErrorsOut chan<- LoaderError,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) (abort bool) {
chunkStartTime := time.Now()
_, err := db.CopyFrom(
ctx,
identifier,
colNames,
pgx.CopyFromRows(chunk.Data),
)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == "23505" {
select {
case chJobErrorsOut <- JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal data integrity error in table %s", identifier.Sanitize()),
Prev: err,
}:
case <-ctx.Done():
}
wgActiveChunks.Done()
return true
}
}
select {
case chErrorsOut <- LoaderError{Chunk: chunk, Msg: err.Error()}:
case <-ctx.Done():
return true
}
return false
}
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(chunk.Data)) / chunkDuration.Seconds()
log.Infof("Loaded chunk: %d rows in %v (%.0f rows/sec)", len(chunk.Data), chunkDuration, rowsPerSec)
wgActiveChunks.Done()
return false
}
func loadRowsMssql(ctx context.Context, job MigrationJob, columns []ColumnType, db *sql.DB, in <-chan []UnknownRowValues) error {
chunkCount := 0
totalRowsLoaded := 0
for rows := range in {
chunkStartTime := time.Now()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("error starting transaction: %w", err)
}
fullTableName := fmt.Sprintf("[%s].[%s]", job.Schema, job.Table)
colNames := Map(columns, func(col ColumnType) string {
return col.name
})
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, colNames...))
if err != nil {
tx.Rollback()
return fmt.Errorf("error preparing bulk copy statement: %w", err)
}
copyStartTime := time.Now()
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return fmt.Errorf("error executing row insert: %w", err)
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return fmt.Errorf("error flushing bulk data: %w", err)
}
err = stmt.Close()
if err != nil {
tx.Rollback()
return fmt.Errorf("error closing statement: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("error committing transaction: %w", err)
}
rowsAffected, _ := result.RowsAffected()
chunkCount++
totalRowsLoaded += int(rowsAffected)
copyDuration := time.Since(copyStartTime)
chunkDuration := time.Since(chunkStartTime)
rowsPerSec := float64(len(rows)) / chunkDuration.Seconds()
log.Infof("Loaded chunk #%d (MSSQL): %d rows in %v (copy: %v, %.0f rows/sec) - Total: %d rows", chunkCount, len(rows), chunkDuration, copyDuration, rowsPerSec, totalRowsLoaded)
}
return nil
}
func Map[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}
func fakeLoader(job MigrationJob, columns []ColumnType, in <-chan [][]any) {
for rows := range in {
log.Debugf("Chunk received, loading data into...")
for i, rowValues := range rows {
if i%100 == 0 {
logSampleRow(job, columns, rowValues, fmt.Sprintf("row %d", i))
}
}
}
}

View File

@@ -3,6 +3,7 @@ package main
import (
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
log "github.com/sirupsen/logrus"
)
@@ -13,5 +14,13 @@ func configureLog() {
DisableSorting: false,
PadLevelText: true,
})
log.SetLevel(log.InfoLevel)
logLevelEnv := config.App.LogLevel
logLevel, err := log.ParseLevel(logLevelEnv)
if err != nil {
log.Warnf("Nivel de log inválido '%s', usando INFO por defecto", logLevelEnv)
logLevel = log.InfoLevel
}
log.SetLevel(logLevel)
}

View File

@@ -2,63 +2,185 @@ package main
import (
"context"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus"
)
type MigrationJob struct {
Schema string
Table string
PrimaryKey string
}
var migrationJobs []MigrationJob = []MigrationJob{
{
Schema: "Cartografia",
Table: "MANZANA",
PrimaryKey: "GDB_ARCHIVE_OID",
},
{
Schema: "Red",
Table: "PUERTO",
PrimaryKey: "ID_PUERTO",
},
}
const (
NumExtractors int = 4
NumLoaders int = 8
ChunkSize int = 25000
QueueSize int = 8
ChunksPerBatch int = 16
RowsPerBatch int64 = int64(ChunkSize * ChunksPerBatch)
"golang.org/x/sync/errgroup"
)
func main() {
configureLog()
migrationConfig, err := config.ReadMigrationConfig()
if err != nil {
log.Fatalf("error leyendo configuracion: %v", err)
}
// log.Debugf("Config: %+v", migrationConfig)
startTime := time.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Info("=== Starting migration ===")
log.Infof("Number of loaders: %d, Chunk size: %d", NumLoaders, ChunkSize)
sourceDb, targetDb, connError := connectToDatabases()
if connError != nil {
log.Fatal("Connection error: ", connError)
var wgConnect errgroup.Group
var sourceDb, targetDb dbwrapper.DbWrapper
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
if err := wgConnect.Wait(); err != nil {
log.Error("Connection error: ", err)
return
}
defer sourceDb.Close()
defer targetDb.Close()
for _, job := range migrationJobs {
log.Infof(">>> Processing job: %s.%s <<<", job.Schema, job.Table)
processMigrationJob(ctx, sourceDb, targetDb, job)
results := processMigrationJobs(ctx, sourceDb, targetDb, migrationConfig.Jobs, migrationConfig.MaxParallelWorkers)
log.Info("=== RESUMEN DE MIGRACIÓN ===")
var totalProcessed, totalErrors int64
for _, res := range results {
status := "OK"
if res.Error != nil {
status = "FAILED"
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v | Error: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration, res.Error)
} else {
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
}
totalProcessed += res.RowsLoaded
if res.Error != nil {
totalErrors++
}
}
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
totalDuration := time.Since(startTime)
log.Infof("=== Migration completed successfully! ===")
log.Infof("Total migration time: %v", totalDuration)
}
func processMigrationJobs(
ctx context.Context,
sourceDb dbwrapper.DbWrapper,
targetDb dbwrapper.DbWrapper,
jobs []config.Job,
maxParallelWorkers int,
) []models.JobResult {
if len(jobs) == 0 {
log.Info("No migration jobs configured")
return []models.JobResult{}
}
if maxParallelWorkers <= 0 {
maxParallelWorkers = 1
}
if maxParallelWorkers > len(jobs) {
maxParallelWorkers = len(jobs)
}
log.Infof("Starting migration with %d parallel worker(s)", maxParallelWorkers)
chJobResults := make(chan models.JobResult, len(jobs))
chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
loader := loaders.NewGenericLoader(targetDb)
var azureClient *azure.Client
if config.App.AzureStorage.Enabled {
var err error
azureClient, err = azure.NewClient(config.App.AzureStorage)
if err != nil {
log.Fatalf("Failed to create Azure storage client: %v", err)
}
}
for i := range maxParallelWorkers {
wgJobs.Go(func() {
for job := range chJobs {
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob(
ctx,
targetDb,
sourceTableAnalyzer,
targetTableAnalyzer,
extractor,
azureClient,
loader,
job,
targetDb.GetDialect(),
)
chJobResults <- res
}
})
}
for _, job := range jobs {
chJobs <- job
}
close(chJobs)
go func() {
wgJobs.Wait()
close(chJobResults)
}()
var finalResults []models.JobResult
for res := range chJobResults {
finalResults = append(finalResults, res)
}
return finalResults
}
func connectWithTimeout(ctx context.Context, dbType string, dbUrl string, timeout time.Duration) (dbwrapper.DbWrapper, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sourceDb, err := dbwrapper.New(dbType)
if err != nil {
return nil, err
}
if err = sourceDb.Connect(localCtx, dbUrl); err != nil {
return nil, err
}
return sourceDb, nil
}

View File

@@ -2,135 +2,268 @@ package main
import (
"context"
"database/sql"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/microsoft/go-mssqldb"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
func buildTruncateQuery(targetDbType, schema, table, truncateMethod string) string {
if truncateMethod == "DELETE" {
if targetDbType == "postgres" {
return fmt.Sprintf(`DELETE FROM "%s"."%s"`, schema, table)
}
return fmt.Sprintf(`DELETE FROM [%s].[%s]`, schema, table)
}
if targetDbType == "postgres" {
return fmt.Sprintf(`TRUNCATE TABLE "%s"."%s"`, schema, table)
}
return fmt.Sprintf(`TRUNCATE TABLE [%s].[%s]`, schema, table)
}
func processMigrationJob(
ctx context.Context,
sourceDb *sql.DB,
targetDb *pgxpool.Pool,
job MigrationJob,
) {
jobStartTime := time.Now()
log.Infof("Starting migration job: %s.%s [PK: %s]", job.Schema, job.Table, job.PrimaryKey)
sourceColTypes, targetColTypes, err := GetColumnTypes(sourceDb, targetDb, job)
if err != nil {
log.Fatal("Unexpected error: ", err)
}
logColumnTypes(sourceColTypes, "Source col types")
logColumnTypes(targetColTypes, "Target col types")
jobCtx, cancel := context.WithCancel(ctx)
targetDbWrapper dbwrapper.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor,
azureClient *azure.Client,
loader etl.Loader,
job config.Job,
targetDbType string,
) models.JobResult {
transformer := transformers.NewMssqlTransformer(job.ToStorage, job.SourceTable, azureClient)
localCtx, cancel := context.WithCancel(ctx)
defer cancel()
batches, err := batchGeneratorMssql(jobCtx, sourceDb, job)
result := models.JobResult{
JobName: job.Name,
StartTime: time.Now(),
}
var rowsRead, rowsLoaded, rowsFailed int64
var wgQueryColumnTypes errgroup.Group
var sourceColTypes, targetColTypes []models.ColumnType
wgQueryColumnTypes.Go(func() error {
var err error
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo)
if err != nil {
return err
}
return nil
})
wgQueryColumnTypes.Go(func() error {
var err error
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo)
if err != nil {
return err
}
return nil
})
err := wgQueryColumnTypes.Wait()
if err != nil {
result.Error = err
return result
}
preSqlQueries := job.TargetTable.PreSQL
if job.TruncateTarget {
truncateQuery := buildTruncateQuery(targetDbType, job.TargetTable.Schema, job.TargetTable.Table, job.TruncateMethod)
preSqlQueries = append([]string{truncateQuery}, job.TargetTable.PreSQL...)
}
for _, query := range preSqlQueries {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
partitions, err := table_analyzers.PartitionRangeGenerator(
localCtx,
sourceTableAnalyzer,
job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey,
job.RowsPerPartition,
job.Range,
)
if err != nil {
log.Error("Unexpected error calculating batch ranges: ", err)
}
chJobErrors := make(chan JobError, 50)
chBatches := make(chan Batch, QueueSize)
chExtractorErrors := make(chan ExtractorError, QueueSize)
chChunksRaw := make(chan Chunk, QueueSize)
chChunksTransformed := make(chan Chunk, QueueSize)
chLoadersErrors := make(chan LoaderError, QueueSize)
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
chPartitions := make(chan models.Partition, job.QueueSize)
chBatchesRaw := make(chan models.Batch, job.QueueSize)
chBatchesTransformed := make(chan models.Batch, job.QueueSize)
var wgActivePartitions sync.WaitGroup
var wgActiveBatches sync.WaitGroup
var wgActiveChunks sync.WaitGroup
var wgExtractors sync.WaitGroup
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
go func() {
if err := jobErrorHandler(jobCtx, chJobErrors); err != nil {
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
cancel()
result.Error = err
}
}()
go extractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches)
go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks)
go custom_errors.ExtractorErrorHandler(
localCtx,
job.Retry,
job.MaxPartitionErrrors,
chExtractorErrors,
chPartitions,
chJobErrors,
&wgActivePartitions,
)
go custom_errors.LoaderErrorHandler(
localCtx,
job.Retry,
job.MaxChunkErrors,
chLoadersErrors,
chBatchesTransformed,
chJobErrors,
&wgActiveBatches,
)
maxExtractors := min(NumExtractors, len(batches))
log.Infof("Starting %d extractors...", maxExtractors)
extractStartTime := time.Now()
maxExtractors := min(job.MaxExtractors, len(partitions))
log.Infof("Starting %d extractor(s)...", maxExtractors)
for range maxExtractors {
wgExtractors.Go(func() {
extractFromMssql(jobCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunksRaw, chExtractorErrors, chJobErrors, &wgActiveBatches)
extractors.Consume(
localCtx,
extractor,
job.SourceTable,
sourceColTypes,
job.BatchSize,
chPartitions,
chBatchesRaw,
chExtractorErrors,
chJobErrors,
&wgActivePartitions,
&rowsRead,
)
})
}
wgActiveBatches.Add(len(batches))
wgActivePartitions.Add(len(partitions))
go func() {
for _, batch := range batches {
chBatches <- batch
for _, batch := range partitions {
chPartitions <- batch
}
}()
log.Infof("Starting %d transformers...", maxExtractors)
transformStartTime := time.Now()
log.Infof("Starting %d transformer(s)...", maxExtractors)
for range maxExtractors {
wgTransformers.Go(func() {
transformRowsMssql(jobCtx, sourceColTypes, chChunksRaw, chChunksTransformed, chJobErrors, &wgActiveChunks)
transformer.Exec(
localCtx,
sourceColTypes,
chBatchesRaw,
chBatchesTransformed,
chJobErrors,
&wgActiveBatches,
)
})
}
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
loadStartTime := time.Now()
log.Infof("Starting %d loader(s)...", job.MaxLoaders)
for range NumLoaders {
for range job.MaxLoaders {
wgLoaders.Go(func() {
loadRowsPostgres(jobCtx, targetDb, job, targetColTypes, chChunksTransformed, chLoadersErrors, chJobErrors, &wgActiveChunks)
loader.Exec(
localCtx,
job.TargetTable,
targetColTypes,
chBatchesTransformed,
chLoadersErrors,
chJobErrors,
&wgActiveBatches,
&rowsLoaded,
)
})
}
go func() {
wgActiveBatches.Wait()
close(chBatches)
log.Debugf("Waiting for goroutines (%v)", job.Name)
wgActivePartitions.Wait()
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name)
close(chExtractorErrors)
log.Debugf("chExtractorErrors is closed (%v)", job.Name)
wgExtractors.Wait()
log.Infof("Extraction completed in %v", time.Since(extractStartTime))
close(chChunksRaw)
log.Debugf("wgExtractors is empty (%v)", job.Name)
close(chBatchesRaw)
log.Debugf("chBatchesRaw is closed (%v)", job.Name)
wgTransformers.Wait()
log.Infof("Transformation completed in %v", time.Since(transformStartTime))
log.Debugf("wgTransformers is empty (%v)", job.Name)
wgActiveChunks.Wait()
close(chChunksTransformed)
wgActiveBatches.Wait()
log.Debugf("wgActiveBatches is empty (%v)", job.Name)
close(chBatchesTransformed)
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
close(chLoadersErrors)
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
wgLoaders.Wait()
log.Infof("Loading completed in %v", time.Since(loadStartTime))
log.Debugf("wgLoaders is empty (%v)", job.Name)
cancel()
}()
<-jobCtx.Done()
log.Infof("Migration job completed. Total time: %v", time.Since(jobStartTime))
}
func logColumnTypes(columnTypes []ColumnType, label string) {
log.Debug(label)
for _, col := range columnTypes {
log.Debugf("%+v", col)
for _, query := range job.TargetTable.PostSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
func logSampleRow(job MigrationJob, columns []ColumnType, rowValues UnknownRowValues, tag string) {
log.Infof("[%s.%s] Sample row: (%s)", job.Schema, job.Table, tag)
for i, col := range columns {
log.Infof("%s (%T): %v", col.Name(), rowValues[i], rowValues[i])
}
log.Debugf("waiting for local context to be done (%v)", job.Name)
<-localCtx.Done()
log.Debugf("local context done (%v)", job.Name)
if ctx.Err() != nil {
result.Error = ctx.Err()
}
result.Duration = time.Since(result.StartTime)
result.RowsRead = atomic.LoadInt64(&rowsRead)
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed)
if result.RowsRead != result.RowsLoaded {
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
return result
}

View File

@@ -1,149 +0,0 @@
package main
import (
"context"
"errors"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type transformerFunc func(any) (any, error)
type columnTransformPlan struct {
index int
fn transformerFunc
}
func transformRowsMssql(
ctx context.Context,
columns []ColumnType,
chChunksIn <-chan Chunk,
chChunksOut chan<- Chunk,
chJobErrorsOut chan<- JobError,
wgActiveChunks *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case chunk, ok := <-chChunksIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chChunksOut <- chunk:
wgActiveChunks.Add(1)
continue
case <-ctx.Done():
return
}
}
chunkStartTime := time.Now()
err := processChunk(ctx, &chunk, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
log.Infof("Transformed chunk %s: %d rows in %v", chunk.Id, len(chunk.Data), time.Since(chunkStartTime))
select {
case chChunksOut <- chunk:
case <-ctx.Done():
return
}
wgActiveChunks.Add(1)
}
}
}
func computeTransformationPlan(columns []ColumnType) []columnTransformPlan {
var plan []columnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, columnTransformPlan{
index: i,
fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
const processChunkCtxCheck = 4096
func processChunk(ctx context.Context, chunk *Chunk, transformationPlan []columnTransformPlan) error {
for i, rowValues := range chunk.Data {
if i%processChunkCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
for _, task := range transformationPlan {
val := rowValues[task.index]
if val == nil {
continue
}
transformed, err := task.fn(val)
if err != nil {
return err
}
rowValues[task.index] = transformed
}
}
return nil
}

74
config.yaml Normal file
View File

@@ -0,0 +1,74 @@
max_parallel_workers: 4
source_db_type: sqlserver
target_db_type: postgres
defaults:
max_extractors: 2
max_loaders: 4
queue_size: 8
batch_size: 25000
batches_per_partition: 8
truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_chunk_errors: 5
retry:
attempts: 3
base_delay_ms: 500
max_delay_ms: 10000
max_jitter_ms: 500
jobs:
- name: cartografia_manzana
enabled: true
source:
schema: Cartografia
table: MANZANA
primary_key: GDB_ARCHIVE_OID
target:
schema: Cartografia
table: MANZANA
pre_sql:
- 'SELECT 1'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto
enabled: true
source:
schema: Red
table: PUERTO
primary_key: ID_PUERTO
target:
schema: Red
table: PUERTO
pre_sql:
- 'SELECT 1'
post_sql:
- "SELECT 1"
- name: infraestructura_site_holder__attach
source:
schema: Infraestructura
table: SITE_HOLDER__ATTACH
primary_key: GDB_ARCHIVE_OID
target:
schema: Infraestructura
table: SITE_HOLDER__ATTACH
to_storage:
columns:
- source: DATA
target: FILE_URL
mode: REFERENCE_ONLY
max_extractors: 8
max_loaders: 4
queue_size: 32
batch_size: 1
retry:
attempts: 5
base_delay_ms: 1000
max_delay_ms: 15000
max_jitter_ms: 500

15
go.mod
View File

@@ -1,26 +1,35 @@
module git.ksdemosapps.com/kylesoda/go-migrate
go 1.25.7
go 1.26
require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
github.com/gaspardle/go-mssqlclrgeo v0.0.0-20160129143314-97ceabf987a4
github.com/google/uuid v1.6.0
github.com/ilyakaznacheev/cleanenv v1.5.0
github.com/jackc/pgx/v5 v5.9.1
github.com/joho/godotenv v1.5.1
github.com/microsoft/go-mssqldb v1.9.8
github.com/sirupsen/logrus v1.9.4
github.com/twpayne/go-geom v1.6.1
golang.org/x/sync v0.19.0
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
github.com/BurntSushi/toml v1.6.0 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/net v0.51.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect
)

19
go.sum
View File

@@ -4,12 +4,19 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 h1:Hk5QBxZQC1jb2Fwj6mpz
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1/go.mod h1:IYus9qsFobWIc2YVwe/WPjcnyCkPKtnHAqUYeebc8z0=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.8.1 h1:/Zt+cDPnpC3OVDm/JKLOs7M2DKmLRIIp3XIx9pHHiig=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.8.1/go.mod h1:Ng3urmn6dYe8gnbCMoHHVl5APYz2txho3koEkV2o2HA=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.4.0 h1:E4MgwLBGeVB5f2MdcIVD3ELVAWpr+WD6MUe1i+tM/PA=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.4.0/go.mod h1:Y2b/1clN4zsAoUd/pgNAQHjLDnTis/6ROkUfyob6psM=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0 h1:nCYfgcSyHZXJI8J0IWE5MsCGlb2xp9fJiXyxWgmOFg4=
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0/go.mod h1:ucUjca2JtSZboY8IoUqyQyuuXvwbMBVwFOm0vdQPNhA=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 h1:jWQK1GI+LeGGUKBADtcH2rRqPxYB1Ljwms5gFA2LqrM=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4/go.mod h1:8mwH4klAm9DUgR2EEHyEEAQlRDvLPyg5fQry3y+cDew=
github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgvJqCH0sFfrBUTnUJSBrBf7++ypk+twtRs=
github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk=
github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY=
@@ -31,6 +38,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/ilyakaznacheev/cleanenv v1.5.0 h1:0VNZXggJE2OYdXE87bfSSwGxeiGt9moSR2lOrsHHvr4=
github.com/ilyakaznacheev/cleanenv v1.5.0/go.mod h1:a5aDzaJrLCQZsazHol1w8InnDcOX0OColm64SlIi6gk=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -41,6 +50,10 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/microsoft/go-mssqldb v1.9.8 h1:d4IFMvF/o+HdpXUqbBfzHvn/NlFA75YGcfHUUvDFJEM=
@@ -49,6 +62,8 @@ github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmd
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w=
@@ -71,6 +86,10 @@ golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 h1:slmdOY3vp8a7KQbHkL+FLbvbkgMqmXojpFUO/jENuqQ=
olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3/go.mod h1:oVgVk4OWVDi43qWBEyGhXgYxt7+ED4iYNpTngSLX2Iw=

View File

@@ -0,0 +1,87 @@
package azure
import (
"context"
"errors"
"fmt"
"net/url"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)
var (
ErrInvalidConnectionString = errors.New("invalid connection string")
ErrContainerNotFound = errors.New("container not found")
ErrBlobNotFound = errors.New("blob not found")
ErrInvalidInput = errors.New("invalid input parameters")
)
type Client struct {
client *azblob.Client
azureStorageConfig config.AzureStorageConfig
}
func NewClient(azureStorageConfig config.AzureStorageConfig) (*Client, error) {
protocol := "https"
if !azureStorageConfig.UseHTTPS {
protocol = "http"
}
blobEndpoint, _ := url.JoinPath(azureStorageConfig.ServiceURL, azureStorageConfig.AccountName)
connStr := fmt.Sprintf("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s;BlobEndpoint=%s;",
protocol, azureStorageConfig.AccountName, azureStorageConfig.AccountKey, blobEndpoint)
client, err := azblob.NewClientFromConnectionString(connStr, nil)
if err != nil {
return nil, fmt.Errorf("creating azure storage client: %w", err)
}
return &Client{
client: client,
azureStorageConfig: azureStorageConfig,
}, nil
}
func (c *Client) CreateContainer(ctx context.Context, containerName string) error {
if containerName == "" {
return ErrInvalidInput
}
_, err := c.client.CreateContainer(ctx, containerName, nil)
if err != nil {
return fmt.Errorf("creating container %s: %w", containerName, err)
}
return nil
}
func (c *Client) UploadBuffer(ctx context.Context, containerName, blobPath string, buffer []byte) error {
if containerName == "" || blobPath == "" || buffer == nil {
return ErrInvalidInput
}
_, err := c.client.UploadBuffer(ctx, containerName, blobPath, buffer, nil)
if err != nil {
return fmt.Errorf("uploading blob %s: %w", blobPath, err)
}
return nil
}
func (c *Client) UploadAndGetURL(ctx context.Context, blobPath string, buffer []byte) (string, error) {
if blobPath == "" || buffer == nil {
return "", ErrInvalidInput
}
fullPath := blobPath
if c.azureStorageConfig.Prefix != "" {
fullPath, _ = url.JoinPath(c.azureStorageConfig.Prefix, blobPath)
}
if err := c.UploadBuffer(ctx, c.azureStorageConfig.Container, fullPath, buffer); err != nil {
return "", err
}
blobEndpoint, _ := url.JoinPath(c.azureStorageConfig.ServiceURL, c.azureStorageConfig.AccountName)
blobURL, _ := url.JoinPath(blobEndpoint, c.azureStorageConfig.Container, fullPath)
return blobURL, nil
}

View File

@@ -1,55 +1,41 @@
package config
import (
"os"
"github.com/joho/godotenv"
"github.com/ilyakaznacheev/cleanenv"
log "github.com/sirupsen/logrus"
)
type appConfig struct {
SourceDbUrl string
SourceDbType string
TargetDbUrl string
TargetDbType string
type AzureStorageConfig struct {
AccountName string `env:"AZ_ACCOUNT_NAME"`
Container string `env:"AZ_CONTAINER"`
AccountKey string `env:"AZ_ACCOUNT_KEY"`
UseHTTPS bool `env:"AZ_USE_HTTPS" env-default:"true"`
ServiceURL string `env:"AZ_SERVICE_URL"`
Prefix string `env:"AZ_PREFIX"`
Enabled bool `env:"AZ_STORAGE_ENABLED"`
}
func loadEnv() {
err := godotenv.Load()
if err != nil {
log.Warn("Warning: could not load .env file")
}
type appConfig struct {
SourceDbUrl string `env:"SOURCE_DB_URL" env-required:"true"`
TargetDbUrl string `env:"TARGET_DB_URL" env-required:"true"`
LogLevel string `env:"LOG_LEVEL" env-default:"INFO"`
AzureStorage AzureStorageConfig
}
func getAppConfig() appConfig {
loadEnv()
var cfg appConfig
sourceDbUrl := os.Getenv("SOURCE_DB_URL")
if sourceDbUrl == "" {
log.Fatal("SOURCE_DB_URL environment variable not set")
err := cleanenv.ReadConfig(".env", &cfg)
if err != nil {
log.Warn("Could not load .env file")
}
sourceDbType := os.Getenv("SOURCE_DB_TYPE")
if sourceDbType == "" {
log.Fatal("SOURCE_DB_TYPE environment variable not set")
err = cleanenv.ReadEnv(&cfg)
if err != nil {
log.Fatalf("Error al cargar variables: %v", err)
}
targetDbUrl := os.Getenv("TARGET_DB_URL")
if targetDbUrl == "" {
log.Fatal("TARGET_DB_URL environment variable not set")
}
targetDbType := os.Getenv("TARGET_DB_TYPE")
if targetDbType == "" {
log.Fatal("TARGET_DB_TYPE environment variable not set")
}
return appConfig{
SourceDbUrl: sourceDbUrl,
SourceDbType: sourceDbType,
TargetDbUrl: targetDbUrl,
TargetDbType: targetDbType,
}
return cfg
}
var App appConfig = getAppConfig()

View File

@@ -0,0 +1,151 @@
package config
import (
"fmt"
"os"
"gopkg.in/yaml.v3"
)
type RetryConfig struct {
Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"`
}
type ToStorageColumnConfig struct {
Source string `yaml:"source"`
Target string `yaml:"target"`
Mode string `yaml:"mode"`
}
type ToStorageConfig struct {
Columns []ToStorageColumnConfig `yaml:"columns"`
}
type JobConfig struct {
MaxExtractors int `yaml:"max_extractors"`
MaxLoaders int `yaml:"max_loaders"`
QueueSize int `yaml:"queue_size"`
BatchSize int `yaml:"batch_size"`
BatchesPerPartition int `yaml:"batches_per_partition"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxChunkErrors int `yaml:"max_chunk_errors"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
}
type TableInfo struct {
Schema string `yaml:"schema"`
Table string `yaml:"table"`
}
type SourceTableInfo struct {
TableInfo `yaml:",inline"`
PrimaryKey string `yaml:"primary_key"`
}
type TargetTableInfo struct {
TableInfo `yaml:",inline"`
PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"`
}
type RangeConfig struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
type Job struct {
Name string `yaml:"name"`
Enabled bool `yaml:"enabled"`
SourceTable SourceTableInfo `yaml:"source"`
TargetTable TargetTableInfo `yaml:"target"`
JobConfig `yaml:",inline"`
Range RangeConfig `yaml:"range"`
}
type MigrationConfig struct {
MaxParallelWorkers int `yaml:"max_parallel_workers"`
SourceDbType string `yaml:"source_db_type"`
TargetDbType string `yaml:"target_db_type"`
Defaults JobConfig `yaml:"defaults"`
Jobs []Job `yaml:"jobs"`
}
type rawConfig struct {
MaxParallelWorkers int `yaml:"max_parallel_workers"`
SourceDbType string `yaml:"source_db_type"`
TargetDbType string `yaml:"target_db_type"`
Defaults JobConfig `yaml:"defaults"`
Jobs []yaml.Node `yaml:"jobs"`
}
func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
var raw rawConfig
if err := value.Decode(&raw); err != nil {
return err
}
c.MaxParallelWorkers = raw.MaxParallelWorkers
c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs {
job := Job{
JobConfig: raw.Defaults,
}
if err := node.Decode(&job); err != nil {
return err
}
job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition)
c.Jobs = append(c.Jobs, job)
}
return nil
}
const defaultConfigFileName string = "config.yaml"
func filenamesOrDefault(filenames []string) []string {
if len(filenames) == 0 {
return []string{defaultConfigFileName}
}
return filenames
}
func ReadMigrationConfig(filenames ...string) (MigrationConfig, error) {
filenames = filenamesOrDefault(filenames)
var data []byte
var err error
for _, filename := range filenames {
data, err = os.ReadFile(filename)
if err != nil {
continue
}
break
}
if err != nil {
return MigrationConfig{}, fmt.Errorf("Error reading config file: %v", err)
}
var config MigrationConfig
if err := yaml.Unmarshal(data, &config); err != nil {
return MigrationConfig{}, fmt.Errorf("Error parsing config file: %v", err)
}
return config, nil
}

View File

@@ -0,0 +1,18 @@
package convert
func ToInt64(v any) (int64, bool) {
switch t := v.(type) {
case int:
return int64(t), true
case int8:
return int64(t), true
case int16:
return int64(t), true
case int32:
return int64(t), true
case int64:
return int64(t), true
default:
return 0, false
}
}

View File

@@ -0,0 +1,61 @@
package custom_errors
import (
"context"
"math/rand"
"time"
)
func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
if retryCounter < 0 {
retryCounter = 0
}
delay := max(time.Duration(baseDelayMs)*time.Millisecond, 0)
maxDelay := time.Duration(maxDelayMs) * time.Millisecond
for i := 0; i < retryCounter; i++ {
if maxDelayMs > 0 && delay >= maxDelay {
delay = maxDelay
break
}
if delay == 0 {
break
}
delay *= 2
}
if maxDelayMs > 0 && delay > maxDelay {
delay = maxDelay
}
if maxJitterMs > 0 {
jitter := time.Duration(rand.Intn(maxJitterMs+1)) * time.Millisecond
delay += jitter
}
if delay < 0 {
delay = 0
}
return delay
}
func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) {
if delay <= 0 {
enqueue()
return
}
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return
case <-timer.C:
enqueue()
}
}()
}

View File

@@ -0,0 +1,119 @@
package custom_errors
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type ExtractorError struct {
Partition models.Partition
LastId int64
HasLastId bool
Msg string
}
func (e *ExtractorError) Error() string {
return e.Msg
}
func ExtractorErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxPartitionErrors int,
chErrorsIn <-chan ExtractorError,
chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError,
wgActivePartitions *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Partition.RetryCounter >= retryConfig.Attempts {
wgActivePartitions.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
newPartition := err.Partition
newPartition.RetryCounter++
delay := computeBackoffDelay(
newPartition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
if err.HasLastId {
newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New()
newPartition.Range.Min = err.LastId
newPartition.Range.IsMinInclusive = false
}
requeueWithBackoff(ctx, delay, func() {
select {
case chPartitionsOut <- newPartition:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -1,4 +1,4 @@
package main
package custom_errors
import (
"context"
@@ -21,7 +21,7 @@ func (e *JobError) Error() string {
return e.Msg
}
func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
func JobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
for {
if ctx.Err() != nil {
return nil
@@ -37,11 +37,11 @@ func jobErrorHandler(ctx context.Context, chErrorsIn <-chan JobError) error {
}
if err.ShouldCancelJob {
log.Error(err.Msg, " - ", err.Prev)
log.Errorf("(Fatal job error) - %v - %v", err.Msg, err.Prev)
return &err
}
log.Error(err.Msg, " - ", err.Prev)
log.Errorf("%v - %v", err.Msg, err.Prev)
}
}
}

View File

@@ -0,0 +1,107 @@
package custom_errors
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type LoaderError struct {
Batch models.Batch
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}
func LoaderErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxChunkErrors int,
chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
err.Batch.RetryCounter++
delay := computeBackoffDelay(
err.Batch.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
requeueWithBackoff(ctx, delay, func() {
select {
case chBatchesOut <- err.Batch:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -0,0 +1,19 @@
package dbwrapper
import "fmt"
type Factory func() DbWrapper
var drivers = make(map[string]Factory)
func Register(name string, factory Factory) {
drivers[name] = factory
}
func New(driverType string) (DbWrapper, error) {
factory, ok := drivers[driverType]
if !ok {
return nil, fmt.Errorf("driver not yet supported: %s", driverType)
}
return factory(), nil
}

View File

@@ -0,0 +1,176 @@
package dbwrapper
import (
"context"
"database/sql"
"fmt"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
})
}
type mssqlRowResult struct {
row *sql.Row
}
func (mr *mssqlRowResult) Scan(dest ...any) error {
return mr.row.Scan(dest...)
}
type mssqlRowsResult struct {
columns []string
rows *sql.Rows
}
func (mr *mssqlRowsResult) Close() error {
return mr.rows.Close()
}
func (mr *mssqlRowsResult) Columns() ([]string, error) {
if mr.columns != nil {
return mr.columns, nil
}
return mr.rows.Columns()
}
func (mr *mssqlRowsResult) Err() error {
return mr.rows.Err()
}
func (mr *mssqlRowsResult) Next() bool {
return mr.rows.Next()
}
func (mr *mssqlRowsResult) Scan(dest ...any) error {
return mr.rows.Scan(dest...)
}
func (mr *mssqlRowsResult) Values() ([]any, error) {
columns, err := mr.Columns()
if err != nil {
return nil, err
}
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := mr.rows.Scan(scanArgs...); err != nil {
return nil, err
}
return rowValues, nil
}
type mssqlDbWrapper struct {
db *sql.DB
dialect string
}
func (mw *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error {
db, err := sql.Open("sqlserver", dbUrl)
if err != nil {
return err
}
if err := db.PingContext(ctx); err != nil {
if err := db.Close(); err != nil {
return err
}
return err
}
mw.db = db
return nil
}
func (mw *mssqlDbWrapper) Close() error {
return mw.db.Close()
}
func (mw *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, execErr := mw.db.ExecContext(ctx, query, args...)
if execErr != nil {
return ExecResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: affectedRows}, nil
}
func (mw *mssqlDbWrapper) GetDialect() string {
return mw.dialect
}
func (mw *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := mw.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &mssqlRowsResult{columns: nil, rows: rows}, nil
}
func (mw *mssqlDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := mw.db.QueryRowContext(ctx, query, args...)
return &mssqlRowResult{row: row}
}
func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
tx, err := mw.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
fullTableName := fmt.Sprintf("[%s].[%s]", schema, table)
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, columnNames...))
if err != nil {
tx.Rollback()
return 0, err
}
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
if err := stmt.Close(); err != nil {
tx.Rollback()
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
rowsAffected, raErr := result.RowsAffected()
if raErr != nil {
return 0, nil
}
return rowsAffected, nil
}

View File

@@ -0,0 +1,128 @@
package dbwrapper
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
})
}
type postgresRowResult struct {
row pgx.Row
}
func (pr *postgresRowResult) Scan(dest ...any) error {
return pr.row.Scan(dest...)
}
type postgresRowsResult struct {
columns []string
rows pgx.Rows
}
func (pr *postgresRowsResult) Close() error {
pr.rows.Close()
return nil
}
func (pr *postgresRowsResult) Columns() ([]string, error) {
if pr.columns != nil {
return pr.columns, nil
}
rawColumns := pr.rows.FieldDescriptions()
if rawColumns == nil {
return nil, errors.New("error retrieving columns")
}
columns := make([]string, 0, len(rawColumns))
for _, rc := range rawColumns {
columns = append(columns, rc.Name)
}
return columns, nil
}
func (pr *postgresRowsResult) Err() error {
return pr.rows.Err()
}
func (pr *postgresRowsResult) Next() bool {
return pr.rows.Next()
}
func (pr *postgresRowsResult) Scan(dest ...any) error {
return pr.rows.Scan(dest...)
}
func (pr *postgresRowsResult) Values() ([]any, error) {
return pr.rows.Values()
}
type postgresDbWrapper struct {
db *pgxpool.Pool
dialect string
}
func (pw *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error {
pool, err := pgxpool.New(ctx, dbUrl)
if err != nil {
return err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return err
}
pw.db = pool
return nil
}
func (pw *postgresDbWrapper) Close() error {
pw.db.Close()
return nil
}
func (pw *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, err := pw.db.Exec(ctx, query, args...)
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: result.RowsAffected()}, nil
}
func (pw *postgresDbWrapper) GetDialect() string {
return pw.dialect
}
func (pw *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := pw.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRowsResult{columns: nil, rows: rows}, nil
}
func (pw *postgresDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := pw.db.QueryRow(ctx, query, args...)
return &postgresRowResult{row: row}
}
func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
affectedRows, err := pw.db.CopyFrom(ctx, pgx.Identifier{schema, table}, columnNames, pgx.CopyFromRows(rows))
if err != nil {
return 0, err
}
return affectedRows, nil
}

View File

@@ -0,0 +1,35 @@
package dbwrapper
import (
"context"
"errors"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
type ExecResult struct {
AffectedRows int64
}
type RowsResult interface {
Close() error
Columns() ([]string, error)
Err() error
Next() bool
Scan(dest ...any) error
Values() ([]any, error)
}
type RowResult interface {
Scan(dest ...any) error
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
Exec(ctx context.Context, query string, args ...any) (ExecResult, error)
GetDialect() string
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
}

View File

@@ -1,28 +0,0 @@
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}

View File

@@ -0,0 +1,101 @@
package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
func Consume(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := extractor.Exec(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
}
continue
}
wgActivePartitions.Done()
}
}
}

View File

@@ -0,0 +1,209 @@
package extractors
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type MssqlExtractor struct {
db dbwrapper.DbWrapper
}
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &MssqlExtractor{db: db}
}
func buildExtractQueryMssql(
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
includeRange bool,
isMinInclusive bool,
isMaxInclusive bool,
hasMin bool,
hasMax bool,
) string {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
if col.Type() == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
}
if i < len(columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table)
if includeRange && (hasMin || hasMax) {
sbQuery.WriteString(" WHERE ")
if hasMin {
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
if isMinInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
sbQuery.WriteString(" @min")
}
if hasMin && hasMax {
sbQuery.WriteString(" AND ")
}
if hasMax {
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
if isMaxInclusive {
sbQuery.WriteString(" <=")
} else {
sbQuery.WriteString(" <")
}
sbQuery.WriteString(" @max")
}
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey)
return sbQuery.String()
}
func errorFromLastRow(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition models.Partition,
previousError error,
) *custom_errors.ExtractorError {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok {
currentPartition := partition
currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{
Partition: currentPartition,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return &custom_errors.ExtractorError{
Partition: partition,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}
func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
hasMin := partition.HasRange && partition.Range.Min > 0
hasMax := partition.HasRange && partition.Range.Max > 0
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
var queryArgs []any
if hasMin {
queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min))
}
if hasMax {
queryArgs = append(queryArgs, sql.Named("max", partition.Range.Max))
}
rowsRead := 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() {
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(batchRows) == 0 {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
lastRow := batchRows[len(batchRows)-1]
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
}
rowsRead++
batchRows = append(batchRows, rowValues)
if len(batchRows) >= batchSize {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := rows.Err(); err != nil {
if errors.Is(err, ctx.Err()) {
return rowsRead, ctx.Err()
}
if len(batchRows) > 0 {
lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
}
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
if len(batchRows) > 0 {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
}
return rowsRead, nil
}

View File

@@ -0,0 +1,157 @@
package extractors
import (
"context"
"fmt"
"strings"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type PostgresExtractor struct {
db dbwrapper.DbWrapper
}
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &PostgresExtractor{db: db}
}
func buildExtractQueryPostgres(
sourceDbInfo config.SourceTableInfo,
columns []models.ColumnType,
includeRange bool,
isMinInclusive bool,
isMaxInclusive bool,
hasMin bool,
hasMax bool,
) string {
var sbColumns strings.Builder
if len(columns) == 0 {
sbColumns.WriteString("*")
} else {
for i, col := range columns {
if col.Type() == "GEOMETRY" {
sbColumns.WriteString(`ST_AsEWKB("`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`") AS "`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`"`)
} else {
sbColumns.WriteString(`"`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`"`)
}
if i < len(columns)-1 {
sbColumns.WriteString(", ")
}
}
}
query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table)
if includeRange && (hasMin || hasMax) {
query += " WHERE "
paramIdx := 1
if hasMin {
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
if isMinInclusive {
query += " >="
} else {
query += " >"
}
query += fmt.Sprintf(" $%d", paramIdx)
paramIdx++
}
if hasMin && hasMax {
query += " AND "
}
if hasMax {
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
if isMaxInclusive {
query += " <="
} else {
query += " <"
}
query += fmt.Sprintf(" $%d", paramIdx)
}
}
query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey)
return query
}
func (postgresEx *PostgresExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
hasMin := partition.HasRange && partition.Range.Min > 0
hasMax := partition.HasRange && partition.Range.Max > 0
query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
var queryArgs []any
if hasMin {
queryArgs = append(queryArgs, partition.Range.Min)
}
if hasMax {
queryArgs = append(queryArgs, partition.Range.Max)
}
rowsRead := 0
rows, err := postgresEx.db.Query(ctx, query, queryArgs...)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() {
values, err := rows.Values()
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
rowsRead++
batchRows = append(batchRows, values)
if len(batchRows) >= batchSize {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := rows.Err(); err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
if len(batchRows) > 0 {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, nil
}
}
return rowsRead, nil
}

View File

@@ -0,0 +1,117 @@
package loaders
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgconn"
)
type GenericLoader struct {
db dbwrapper.DbWrapper
}
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &GenericLoader{db: db}
}
func (gl *GenericLoader) ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error) {
_, err := gl.db.SaveMassive(
ctx,
tableInfo.Schema,
tableInfo.Table,
colNames,
batch.Rows,
)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Prev: err,
}
}
}
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
}
return len(batch.Rows), nil
}
func (gl *GenericLoader) Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64,
) {
colNames := mapSlice(columns, func(col models.ColumnType) string {
return col.Name()
})
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil {
var ldError *custom_errors.LoaderError
var jobError *custom_errors.JobError
if errors.As(err, &ldError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *ldError:
}
} else if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}:
}
}
continue
}
wgActiveBatches.Done()
atomic.AddInt64(rowsLoaded, int64(processedRows))
}
}
}

View File

@@ -0,0 +1,11 @@
package loaders
func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
result := make([]V, len(input))
for i, v := range input {
result[i] = mapper(v)
}
return result
}

View File

@@ -0,0 +1,55 @@
package table_analyzers
import (
"context"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func PartitionRangeGenerator(
ctx context.Context,
tableAnalyzer etl.TableAnalyzer,
tableInfo config.TableInfo,
partitionColumn string,
rowsPerPartition int64,
jobRange config.RangeConfig,
) ([]models.Partition, error) {
if jobRange.Min > 0 {
return []models.Partition{{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
Min: jobRange.Min,
Max: jobRange.Max,
IsMinInclusive: jobRange.IsMinInclusive,
IsMaxInclusive: jobRange.IsMaxInclusive,
},
}}, nil
}
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
if err != nil {
return nil, err
}
if rowsCount <= rowsPerPartition {
return []models.Partition{{
Id: uuid.New(),
HasRange: false,
RetryCounter: 0,
}}, nil
}
partitionsCount := rowsCount / rowsPerPartition
partitions, err := tableAnalyzer.CalculatePartitionRanges(ctx, tableInfo, partitionColumn, partitionsCount)
if err != nil {
return nil, err
}
return partitions, nil
}

View File

@@ -0,0 +1,252 @@
package table_analyzers
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type MssqlTableAnalyzer struct {
db dbwrapper.DbWrapper
}
func NewMssqlTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db}
}
const mssqlColumnMetadataQuery string = `
SELECT
c.name AS name,
t.name AS user_type,
CASE WHEN t.is_user_defined = 0 THEN t.name ELSE bt.name END AS system_type,
c.is_nullable AS nullable,
c.max_length AS max_length,
c.precision AS precision,
c.scale AS scale
FROM sys.columns c
JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;`
type rawColumnMssql struct {
name string
userType string
systemType string
nullable bool
maxLength int64
precision int64
scale int64
}
func (ta *MssqlTableAnalyzer) systemTypeToUnifiedType(systemType string) string {
systemType = strings.ToLower(systemType)
if systemType == "varchar" || systemType == "char" || systemType == "nvarchar" || systemType == "nchar" || systemType == "text" || systemType == "ntext" {
return "STRING"
}
if systemType == "int" || systemType == "int4" || systemType == "integer" || systemType == "smallint" || systemType == "int2" || systemType == "bigint" || systemType == "int8" || systemType == "tinyint" {
return "INTEGER"
}
if systemType == "decimal" || systemType == "numeric" {
return "DECIMAL"
}
if systemType == "float" || systemType == "real" || systemType == "double precision" {
return "FLOAT"
}
if systemType == "bit" || systemType == "boolean" {
return "BOOLEAN"
}
if systemType == "date" {
return "DATE"
}
if systemType == "time" || systemType == "time without time zone" {
return "TIME"
}
if systemType == "datetime" || systemType == "datetime2" || systemType == "timestamp" || systemType == "timestamptz" || systemType == "timestamp with time zone" {
return "TIMESTAMP"
}
if systemType == "binary" || systemType == "varbinary" || systemType == "image" || systemType == "bytea" {
return "BINARY"
}
if systemType == "uniqueidentifier" || systemType == "uuid" {
return "UUID"
}
if systemType == "json" {
return "JSON"
}
if systemType == "geometry" || systemType == "geography" {
return "GEOMETRY"
}
return strings.ToUpper(systemType)
}
func (ta *MssqlTableAnalyzer) rawColumnToColumnType(rawColumn rawColumnMssql) models.ColumnType {
const nullValue int64 = -1
stringTypes := map[string]bool{"varchar": true, "char": true, "nvarchar": true, "nchar": true, "text": true, "ntext": true}
decimalTypes := map[string]bool{"decimal": true, "numeric": true}
if stringTypes[rawColumn.systemType] {
if rawColumn.systemType == "nvarchar" || rawColumn.systemType == "nchar" {
if rawColumn.maxLength > 0 {
rawColumn.maxLength = rawColumn.maxLength / 2
}
}
rawColumn.precision, rawColumn.scale = nullValue, nullValue
} else if decimalTypes[rawColumn.systemType] {
rawColumn.maxLength = nullValue
} else {
rawColumn.maxLength, rawColumn.precision, rawColumn.scale = nullValue, nullValue, nullValue
}
columnType := models.NewColumnType(
rawColumn.name,
rawColumn.maxLength != nullValue,
rawColumn.precision != nullValue || rawColumn.scale != nullValue,
rawColumn.userType,
rawColumn.systemType,
ta.systemTypeToUnifiedType(rawColumn.systemType),
rawColumn.nullable,
rawColumn.maxLength,
rawColumn.precision,
rawColumn.scale,
)
return columnType
}
func (ta *MssqlTableAnalyzer) QueryColumnTypes(
ctx context.Context,
tableInfo config.TableInfo,
) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
rows, err := ta.db.Query(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil {
return nil, err
}
defer rows.Close()
var columnTypes []models.ColumnType
for rows.Next() {
var rawColumn rawColumnMssql
if err := rows.Scan(
&rawColumn.name,
&rawColumn.userType,
&rawColumn.systemType,
&rawColumn.nullable,
&rawColumn.maxLength,
&rawColumn.precision,
&rawColumn.scale,
); err != nil {
return nil, err
}
columnTypes = append(columnTypes, ta.rawColumnToColumnType(rawColumn))
}
return columnTypes, nil
}
func (ta *MssqlTableAnalyzer) EstimateTotalRows(
ctx context.Context,
tableInfo config.TableInfo,
) (int64, error) {
query := `
SELECT SUM(p.rows) AS count
FROM sys.tables t
JOIN sys.schemas s ON t.schema_id = s.schema_id
JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var rowsCount int64
err := ta.db.QueryRow(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil {
return 0, err
}
return rowsCount, nil
}
func (ta *MssqlTableAnalyzer) CalculatePartitionRanges(
ctx context.Context,
tableInfo config.TableInfo,
partitionColumn string,
maxPartitions int64,
) ([]models.Partition, error) {
query := fmt.Sprintf(`
SELECT
MIN([%s]) AS lower_limit,
MAX([%s]) AS upper_limit
FROM (SELECT [%s], NTILE(@maxPartitions) OVER (ORDER BY [%s]) AS batch_id FROM [%s].[%s]) AS T
GROUP BY batch_id
ORDER BY batch_id`,
partitionColumn,
partitionColumn,
partitionColumn,
partitionColumn,
tableInfo.Schema,
tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil {
return nil, err
}
defer rows.Close()
partitions := make([]models.Partition, 0, maxPartitions)
for rows.Next() {
partition := models.Partition{
Id: uuid.New(),
HasRange: true,
RetryCounter: 0,
Range: models.PartitionRange{
IsMinInclusive: true,
},
}
if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
return nil, err
}
partitions = append(partitions, partition)
}
if err := rows.Err(); err != nil {
return nil, err
}
return partitions, nil
}

View File

@@ -0,0 +1,174 @@
package table_analyzers
import (
"context"
"strings"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type PostgresTableAnalyzer struct {
db dbwrapper.DbWrapper
}
func NewPostgresTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &PostgresTableAnalyzer{db: db}
}
const postgresColumnMetadataQuery string = `
SELECT
c.column_name AS name,
c.data_type AS user_type,
c.udt_name AS system_type,
(CASE WHEN c.is_nullable = 'YES' THEN TRUE ELSE FALSE END) AS nullable,
COALESCE(c.character_maximum_length, -1) AS max_length,
COALESCE(c.numeric_precision, -1) AS precision,
COALESCE(c.numeric_scale, -1) AS scale
FROM information_schema.columns c
WHERE c.table_schema = $1 AND c.table_name = $2
ORDER BY c.ordinal_position;`
type rawColumnPostgres struct {
name string
userType string
systemType string
nullable bool
maxLength int64
precision int64
scale int64
}
func (ta *PostgresTableAnalyzer) systemTypeToUnifiedType(systemType string) string {
systemType = strings.ToLower(systemType)
if systemType == "varchar" || systemType == "char" || systemType == "nvarchar" || systemType == "nchar" || systemType == "text" || systemType == "ntext" {
return "STRING"
}
if systemType == "int" || systemType == "int4" || systemType == "integer" || systemType == "smallint" || systemType == "int2" || systemType == "bigint" || systemType == "int8" || systemType == "tinyint" {
return "INTEGER"
}
if systemType == "decimal" || systemType == "numeric" {
return "DECIMAL"
}
if systemType == "float" || systemType == "real" || systemType == "double precision" {
return "FLOAT"
}
if systemType == "bit" || systemType == "boolean" {
return "BOOLEAN"
}
if systemType == "date" {
return "DATE"
}
if systemType == "time" || systemType == "time without time zone" {
return "TIME"
}
if systemType == "datetime" || systemType == "datetime2" || systemType == "timestamp" || systemType == "timestamptz" || systemType == "timestamp with time zone" {
return "TIMESTAMP"
}
if systemType == "binary" || systemType == "varbinary" || systemType == "image" || systemType == "bytea" {
return "BINARY"
}
if systemType == "uniqueidentifier" || systemType == "uuid" {
return "UUID"
}
if systemType == "json" {
return "JSON"
}
if systemType == "geometry" || systemType == "geography" {
return "GEOMETRY"
}
return strings.ToUpper(systemType)
}
func (ta *PostgresTableAnalyzer) rawColumnToColumnType(rawColumn rawColumnPostgres) models.ColumnType {
const nullValue int64 = -1
stringTypes := map[string]bool{"varchar": true, "char": true, "text": true}
decimalTypes := map[string]bool{"decimal": true, "numeric": true}
if stringTypes[rawColumn.systemType] {
rawColumn.precision, rawColumn.scale = nullValue, nullValue
} else if decimalTypes[rawColumn.systemType] {
rawColumn.maxLength = nullValue
} else {
rawColumn.maxLength, rawColumn.precision, rawColumn.scale = nullValue, nullValue, nullValue
}
return models.NewColumnType(
rawColumn.name,
rawColumn.maxLength != nullValue,
rawColumn.precision != nullValue || rawColumn.scale != nullValue,
rawColumn.userType,
rawColumn.systemType,
ta.systemTypeToUnifiedType(rawColumn.systemType),
rawColumn.nullable,
rawColumn.maxLength,
rawColumn.precision,
rawColumn.scale,
)
}
func (ta *PostgresTableAnalyzer) QueryColumnTypes(
ctx context.Context,
tableInfo config.TableInfo,
) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table)
if err != nil {
return nil, err
}
defer rows.Close()
var colTypes []models.ColumnType
for rows.Next() {
var column rawColumnPostgres
if err := rows.Scan(
&column.name,
&column.userType,
&column.systemType,
&column.nullable,
&column.maxLength,
&column.precision,
&column.scale,
); err != nil {
return nil, err
}
colTypes = append(colTypes, ta.rawColumnToColumnType(column))
}
return colTypes, nil
}
func (ta *PostgresTableAnalyzer) EstimateTotalRows(
ctx context.Context,
tableInfo config.TableInfo,
) (int64, error) {
return 0, nil
}
func (ta *PostgresTableAnalyzer) CalculatePartitionRanges(
ctx context.Context,
tableInfo config.TableInfo,
partitionColumn string,
maxPartitions int64,
) ([]models.Partition, error) {
return []models.Partition{}, nil
}

View File

@@ -0,0 +1,225 @@
package transformers
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
type MssqlTransformer struct {
toStorage config.ToStorageConfig
sourceTable config.SourceTableInfo
azureClient *azure.Client
}
func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.SourceTableInfo, azureClient *azure.Client) etl.Transformer {
return &MssqlTransformer{
toStorage: toStorage,
sourceTable: sourceTable,
azureClient: azureClient,
}
}
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
func computeStorageTransformationPlan(
ctx context.Context,
azureClient *azure.Client,
toStorage config.ToStorageConfig,
sourceColumns []models.ColumnType,
sourceTable config.SourceTableInfo,
) []etl.ColumnTransformPlan {
if azureClient == nil || len(toStorage.Columns) == 0 {
return nil
}
colIndex := make(map[string]int, len(sourceColumns))
for i, col := range sourceColumns {
colIndex[strings.ToUpper(col.Name())] = i
}
var plan []etl.ColumnTransformPlan
for _, storageCol := range toStorage.Columns {
if storageCol.Mode != "REFERENCE_ONLY" {
log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
continue
}
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
if !ok {
log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
continue
}
sourceColName := storageCol.Source
schema := sourceTable.Schema
table := sourceTable.Table
plan = append(plan, etl.ColumnTransformPlan{
Index: idx,
Fn: func(v any) (any, error) {
if v == nil {
return nil, nil
}
b, ok := v.([]byte)
if !ok {
log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
schema, table, sourceColName, v)
return v, nil
}
start := time.Now()
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
if err != nil {
return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err)
}
log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
return blobURL, nil
},
})
}
return plan
}
const processBatchCtxCheck = 4096
func (mssqlTr *MssqlTransformer) ProcessBatch(
ctx context.Context,
batch *models.Batch,
transformationPlan []etl.ColumnTransformPlan,
) error {
for i, rowValues := range batch.Rows {
if i%processBatchCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
for _, task := range transformationPlan {
val := rowValues[task.Index]
if val == nil {
continue
}
transformed, err := task.Fn(val)
if err != nil {
return err
}
rowValues[task.Index] = transformed
}
}
return nil
}
func (mssqlTr *MssqlTransformer) Exec(
ctx context.Context,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...)
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chBatchesOut <- batch:
wgActiveBatches.Add(1)
continue
case <-ctx.Done():
return
}
}
err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
select {
case chBatchesOut <- batch:
case <-ctx.Done():
return
}
wgActiveBatches.Add(1)
}
}
}

View File

@@ -0,0 +1 @@
package transformers

View File

@@ -1,4 +1,4 @@
package main
package transformers
import (
"encoding/binary"

85
internal/app/etl/types.go Normal file
View File

@@ -0,0 +1,85 @@
package etl
import (
"context"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type Extractor interface {
Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error)
}
type TransformerFunc func(any) (any, error)
type ColumnTransformPlan struct {
Index int
Fn TransformerFunc
}
type Transformer interface {
ProcessBatch(
ctx context.Context,
batch *models.Batch,
transformationPlan []ColumnTransformPlan,
) error
Exec(
ctx context.Context,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chBactchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
)
}
type Loader interface {
ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error)
Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64,
)
}
type TableAnalyzer interface {
QueryColumnTypes(
ctx context.Context,
tableInfo config.TableInfo,
) ([]models.ColumnType, error)
EstimateTotalRows(
ctx context.Context,
tableInfo config.TableInfo,
) (int64, error)
CalculatePartitionRanges(
ctx context.Context,
tableInfo config.TableInfo,
partitionColumn string,
maxPartitions int64,
) ([]models.Partition, error)
}

View File

@@ -1,4 +1,4 @@
package main
package models
type ColumnType struct {
name string
@@ -42,3 +42,29 @@ func (c *ColumnType) Nullable() bool {
func (c *ColumnType) Type() string {
return c.unifiedType
}
func NewColumnType(
name string,
hasMaxLength bool,
hasPrecisionScale bool,
userType string,
systemType string,
unifiedType string,
nullable bool,
maxLength int64,
precision int64,
scale int64,
) ColumnType {
return ColumnType{
name,
hasMaxLength,
hasPrecisionScale,
userType,
systemType,
unifiedType,
nullable,
maxLength,
precision,
scale,
}
}

View File

@@ -0,0 +1,41 @@
package models
import (
"time"
"github.com/google/uuid"
)
type UnknownRowValues = []any
type Batch struct {
Id uuid.UUID
PartitionId uuid.UUID
Rows []UnknownRowValues
RetryCounter int
}
type PartitionRange struct {
Min int64
Max int64
IsMinInclusive bool
IsMaxInclusive bool
}
type Partition struct {
Id uuid.UUID
ParentId uuid.UUID
Range PartitionRange
HasRange bool
RetryCounter int
}
type JobResult struct {
JobName string
StartTime time.Time
Duration time.Duration
RowsRead int64
RowsLoaded int64
RowsFailed int64
Error error
}

44
scripts/az-blob/main.go Normal file
View File

@@ -0,0 +1,44 @@
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
)
func main() {
cfg := config.App.AzureStorage
containerName := cfg.Container
client, err := azure.NewClient(cfg)
if err != nil {
log.Fatalf("Error creando cliente: %v", err)
}
ctx := context.Background()
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
blobName := fmt.Sprintf("%sarchivo-%d.txt", cfg.Prefix, id)
content := fmt.Sprintf("Contenido aleatorio: %d", rand.Intn(100000))
err := client.UploadBuffer(ctx, containerName, blobName, []byte(content))
if err != nil {
log.Printf("Fallo al subir %s: %v", blobName, err)
} else {
fmt.Printf("Subido exitosamente: %s\n", blobName)
}
}(i)
}
wg.Wait()
}

View File

@@ -0,0 +1,17 @@
package main
import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
log "github.com/sirupsen/logrus"
)
func main() {
log.SetLevel(log.DebugLevel)
migrationConfig, err := config.ReadMigrationConfig()
if err != nil {
log.Fatalf("error leyendo configuracion: %v", err)
}
log.Debugf("Config: %+v", migrationConfig)
}

View File

@@ -8,13 +8,32 @@ import (
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
func main() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
@@ -27,8 +46,8 @@ func main() {
ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
sourcePool, err := db.Connect(ctxSource, config.App.SourceDbUrl)
defer db.Close(sourcePool)
sourcePool, err := Connect(ctxSource, config.App.SourceDbUrl)
defer Close(sourcePool)
if err != nil {
log.Fatal(err)
}
@@ -37,8 +56,8 @@ func main() {
ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
targetPool, err := db.Connect(ctxTarget, config.App.TargetDbUrl)
defer db.Close(targetPool)
targetPool, err := Connect(ctxTarget, config.App.TargetDbUrl)
defer Close(targetPool)
if err != nil {
log.Fatal(err)
}