20 Commits

Author SHA1 Message Date
ec96532d04 refactor: optimize row handling in mssql extractor and transformer 2026-04-17 01:10:45 -05:00
46597c4ffd refactor: implement extractor retry logic and streamline extractor interface 2026-04-17 00:33:49 -05:00
15d1b96849 refactor: streamline error handling and remove redundant code in mssql extractor 2026-04-17 00:23:01 -05:00
73b65e2a3f refactor: remove extractor error channel and simplify retry logic in mssql and postgres extractors 2026-04-17 00:07:51 -05:00
DiegoAlessandroMotta
1c3db39b21 update extractor interface 2026-04-16 23:49:23 -05:00
39c0d99502 feat: extend context timeout to 1 minute for database queries in mssql.go and postgres.go 2026-04-16 13:19:46 -05:00
b418ded78b feat: update SQL query to filter out specific column names in mssql.go 2026-04-16 13:14:39 -05:00
0d0511716f feat: add row count mismatch error handling in processMigrationJob and update SQL query to exclude additional graph-related columns 2026-04-16 12:46:55 -05:00
67fb0148ae feat: add Makefile for building binaries across platforms 2026-04-16 11:51:05 -05:00
098cf36e3c feat: comment out TRUNCATE statements in pre_sql for MANZANA and PUERTO jobs 2026-04-16 11:46:24 -05:00
5484716b81 feat: add source and target database type fields to MigrationConfig 2026-04-16 09:08:21 -05:00
df4c3bc390 feat: refactor db handling to use db-wrapper package; enhance connection management and result handling for MSSQL and Postgres 2026-04-16 08:48:29 -05:00
ea41a7c218 feat: register MSSQL and Postgres drivers in db-wrapper for improved factory pattern support 2026-04-15 23:09:56 -05:00
f09284ecdc feat: enhance db-wrapper with improved MSSQL and Postgres implementations; add row result handling and dialect support 2026-04-15 22:55:14 -05:00
0384d5423f feat: add range configuration to job settings for enhanced data processing control 2026-04-15 20:23:45 -05:00
1ce3d9e153 refactor: update partition handling to use Range struct for better clarity and consistency 2026-04-15 20:23:45 -05:00
DiegoAlessandroMotta
ed889b740a add db-wrapper package types 2026-04-15 20:22:23 -05:00
803f8988b8 refactor: update extractor interfaces to return row counts instead of using pointers for rows read 2026-04-13 19:25:18 -05:00
33c9cd9c3e feat: implement database wrapper interfaces for MSSQL and Postgres; enhance migration job processing with pre and post SQL execution 2026-04-13 07:57:18 -05:00
85074da2ec feat: add max partition and chunk error limits to extractor and loader error handlers 2026-04-12 20:57:31 -05:00
28 changed files with 913 additions and 424 deletions

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@
*.dll *.dll
*.so *.so
*.dylib *.dylib
bin/
# Test binary, built with `go test -c` # Test binary, built with `go test -c`
*.test *.test

80
Makefile Normal file
View File

@@ -0,0 +1,80 @@
.PHONY: build build-linux build-windows build-all clean help
# Variables
BINARY_NAME=go-migrate
CMD_PATH=./cmd/go_migrate
OUTPUT_DIR=bin
VERSION?=$(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
GIT_COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Flags de compilación
LD_FLAGS=-ldflags="-s -w -X main.Version=$(VERSION) -X main.BuildTime=$(BUILD_TIME) -X main.GitCommit=$(GIT_COMMIT)"
# Default: compilar para el SO actual
build: build-$(OS)
ifeq ($(OS),Windows_NT)
build-native: build-windows
else
build-native: build-linux
endif
# Compilar para Linux (sin CGO para máxima compatibilidad)
build-linux:
@echo "Compilando para Linux..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-amd64"
# Compilar para Windows
build-windows:
@echo "Compilando para Windows..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-windows-amd64.exe"
# Compilar para ambas plataformas
build-all: build-linux build-windows
@echo ""
@echo "Binarios compilados:"
@ls -lh $(OUTPUT_DIR)/$(BINARY_NAME)*
# Compilar para Linux arm64 (opcional, para Raspberry Pi, etc.)
build-linux-arm64:
@echo "Compilando para Linux ARM64..."
@mkdir -p $(OUTPUT_DIR)
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build \
$(LD_FLAGS) \
-o $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64 \
$(CMD_PATH)
@echo "Binario creado: $(OUTPUT_DIR)/$(BINARY_NAME)-linux-arm64"
# Limpiar binarios
clean:
@echo "Limpiando binarios..."
@rm -rf $(OUTPUT_DIR)
@echo "Limpieza completada"
# Ayuda
help:
@echo "Comandos disponibles:"
@echo ""
@echo " make build - Compilar para el SO actual (Linux/Windows)"
@echo " make build-linux - Compilar para Linux x86_64"
@echo " make build-windows - Compilar para Windows x86_64"
@echo " make build-linux-arm64 - Compilar para Linux ARM64 (opcional)"
@echo " make build-all - Compilar para Linux y Windows"
@echo " make clean - Eliminar binarios compilados"
@echo " make help - Mostrar esta ayuda"
@echo ""
@echo "Ejemplos de uso:"
@echo " make build-all # Crear binarios para ambas plataformas"
@echo " make build-linux OS= # Crear solo para Linux"
@echo ""

View File

@@ -13,5 +13,5 @@ func configureLog() {
DisableSorting: false, DisableSorting: false,
PadLevelText: true, PadLevelText: true,
}) })
log.SetLevel(log.InfoLevel) log.SetLevel(log.DebugLevel)
} }

View File

@@ -2,17 +2,17 @@ package main
import ( import (
"context" "context"
"database/sql"
"sync" "sync"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
) )
func main() { func main() {
@@ -32,11 +32,33 @@ func main() {
log.Info("=== Starting migration ===") log.Info("=== Starting migration ===")
sourceDb, targetDb, connError := connectToDatabases() var wgConnect errgroup.Group
if connError != nil { var sourceDb, targetDb dbwrapper.DbWrapper
log.Fatal("Connection error: ", connError)
}
wgConnect.Go(func() error {
var err error
sourceDb, err = connectWithTimeout(ctx, migrationConfig.SourceDbType, config.App.SourceDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
wgConnect.Go(func() error {
var err error
targetDb, err = connectWithTimeout(ctx, migrationConfig.TargetDbType, config.App.TargetDbUrl, 20*time.Second)
if err != nil {
return err
}
return nil
})
if err := wgConnect.Wait(); err != nil {
log.Error("Connection error: ", err)
return
}
defer sourceDb.Close() defer sourceDb.Close()
defer targetDb.Close() defer targetDb.Close()
@@ -49,8 +71,10 @@ func main() {
status := "OK" status := "OK"
if res.Error != nil { if res.Error != nil {
status = "FAILED" status = "FAILED"
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v | Error: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration, res.Error)
} else {
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
} }
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
totalProcessed += res.RowsLoaded totalProcessed += res.RowsLoaded
if res.Error != nil { if res.Error != nil {
@@ -67,8 +91,8 @@ func main() {
func processMigrationJobs( func processMigrationJobs(
ctx context.Context, ctx context.Context,
sourceDb *sql.DB, sourceDb dbwrapper.DbWrapper,
targetDb *pgxpool.Pool, targetDb dbwrapper.DbWrapper,
jobs []config.Job, jobs []config.Job,
maxParallelWorkers int, maxParallelWorkers int,
) []JobResult { ) []JobResult {
@@ -103,6 +127,7 @@ func processMigrationJobs(
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table) log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob( res := processMigrationJob(
ctx, ctx,
targetDb,
sourceTableAnalyzer, sourceTableAnalyzer,
targetTableAnalyzer, targetTableAnalyzer,
extractor, extractor,
@@ -133,3 +158,19 @@ func processMigrationJobs(
return finalResults return finalResults
} }
func connectWithTimeout(ctx context.Context, dbType string, dbUrl string, timeout time.Duration) (dbwrapper.DbWrapper, error) {
localCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sourceDb, err := dbwrapper.New(dbType)
if err != nil {
return nil, err
}
if err = sourceDb.Connect(localCtx, dbUrl); err != nil {
return nil, err
}
return sourceDb, nil
}

View File

@@ -2,13 +2,16 @@ package main
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -17,6 +20,7 @@ import (
func processMigrationJob( func processMigrationJob(
ctx context.Context, ctx context.Context,
targetDbWrapper dbwrapper.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer, sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer, targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor, extractor etl.Extractor,
@@ -24,7 +28,7 @@ func processMigrationJob(
loader etl.Loader, loader etl.Loader,
job config.Job, job config.Job,
) JobResult { ) JobResult {
jobCtx, cancel := context.WithCancel(ctx) localCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
result := JobResult{ result := JobResult{
@@ -39,7 +43,7 @@ func processMigrationJob(
wgQueryColumnTypes.Go(func() error { wgQueryColumnTypes.Go(func() error {
var err error var err error
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(jobCtx, job.SourceTable.TableInfo) sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo)
if err != nil { if err != nil {
return err return err
} }
@@ -49,7 +53,7 @@ func processMigrationJob(
wgQueryColumnTypes.Go(func() error { wgQueryColumnTypes.Go(func() error {
var err error var err error
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(jobCtx, job.TargetTable.TableInfo) targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo)
if err != nil { if err != nil {
return err return err
} }
@@ -63,8 +67,15 @@ func processMigrationJob(
return result return result
} }
for _, query := range job.PreSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
partitions, err := table_analyzers.PartitionRangeGenerator( partitions, err := table_analyzers.PartitionRangeGenerator(
jobCtx, localCtx,
sourceTableAnalyzer, sourceTableAnalyzer,
job.SourceTable.TableInfo, job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey, job.SourceTable.PrimaryKey,
@@ -75,7 +86,6 @@ func processMigrationJob(
} }
chJobErrors := make(chan custom_errors.JobError, job.QueueSize) chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize) chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
chPartitions := make(chan models.Partition, job.QueueSize) chPartitions := make(chan models.Partition, job.QueueSize)
chBatchesRaw := make(chan models.Batch, job.QueueSize) chBatchesRaw := make(chan models.Batch, job.QueueSize)
@@ -88,24 +98,17 @@ func processMigrationJob(
var wgLoaders sync.WaitGroup var wgLoaders sync.WaitGroup
go func() { go func() {
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil { if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err) log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
cancel() cancel()
result.Error = err result.Error = err
} }
}() }()
go custom_errors.ExtractorErrorHandler(
jobCtx,
job.Retry,
chExtractorErrors,
chPartitions,
chJobErrors,
&wgActivePartitions,
)
go custom_errors.LoaderErrorHandler( go custom_errors.LoaderErrorHandler(
jobCtx, localCtx,
job.Retry, job.Retry,
job.MaxChunkErrors,
chLoadersErrors, chLoadersErrors,
chBatchesTransformed, chBatchesTransformed,
chJobErrors, chJobErrors,
@@ -117,14 +120,14 @@ func processMigrationJob(
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
extractor.Exec( extractors.Consume(
jobCtx, localCtx,
extractor,
job.SourceTable, job.SourceTable,
sourceColTypes, sourceColTypes,
job.BatchSize, job.BatchSize,
chPartitions, chPartitions,
chBatchesRaw, chBatchesRaw,
chExtractorErrors,
chJobErrors, chJobErrors,
&wgActivePartitions, &wgActivePartitions,
&rowsRead, &rowsRead,
@@ -144,7 +147,7 @@ func processMigrationJob(
for range maxExtractors { for range maxExtractors {
wgTransformers.Go(func() { wgTransformers.Go(func() {
transformer.Exec( transformer.Exec(
jobCtx, localCtx,
sourceColTypes, sourceColTypes,
chBatchesRaw, chBatchesRaw,
chBatchesTransformed, chBatchesTransformed,
@@ -159,7 +162,7 @@ func processMigrationJob(
for range job.MaxLoaders { for range job.MaxLoaders {
wgLoaders.Go(func() { wgLoaders.Go(func() {
loader.Exec( loader.Exec(
jobCtx, localCtx,
job.TargetTable, job.TargetTable,
targetColTypes, targetColTypes,
chBatchesTransformed, chBatchesTransformed,
@@ -178,8 +181,6 @@ func processMigrationJob(
log.Debugf("wgActivePartitions is empty (%v)", job.Name) log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions) close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name) log.Debugf("chPartitions is closed (%v)", job.Name)
close(chExtractorErrors)
log.Debugf("chExtractorErrors is closed (%v)", job.Name)
wgExtractors.Wait() wgExtractors.Wait()
log.Debugf("wgExtractors is empty (%v)", job.Name) log.Debugf("wgExtractors is empty (%v)", job.Name)
@@ -202,8 +203,15 @@ func processMigrationJob(
cancel() cancel()
}() }()
for _, query := range job.PostSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
log.Debugf("waiting for local context to be done (%v)", job.Name) log.Debugf("waiting for local context to be done (%v)", job.Name)
<-jobCtx.Done() <-localCtx.Done()
log.Debugf("local context done (%v)", job.Name) log.Debugf("local context done (%v)", job.Name)
if ctx.Err() != nil { if ctx.Err() != nil {
@@ -215,5 +223,9 @@ func processMigrationJob(
result.RowsLoaded = atomic.LoadInt64(&rowsLoaded) result.RowsLoaded = atomic.LoadInt64(&rowsLoaded)
result.RowsFailed = atomic.LoadInt64(&rowsFailed) result.RowsFailed = atomic.LoadInt64(&rowsFailed)
if result.RowsRead != result.RowsLoaded {
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
}
return result return result
} }

View File

@@ -10,6 +10,8 @@ defaults:
batches_per_partition: 8 batches_per_partition: 8
truncate_target: true truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_chunk_errors: 5
retry: retry:
attempts: 3 attempts: 3
base_delay_ms: 500 base_delay_ms: 500
@@ -26,6 +28,14 @@ jobs:
target: target:
schema: Cartografia schema: Cartografia
table: MANZANA table: MANZANA
pre_sql:
- 'SELECT 1'
# - 'TRUNCATE TABLE "Cartografia"."MANZANA"'
range:
min: 1000000
max: 2000000
is_min_inclusive: false
is_max_inclusive: true
- name: red_puerto - name: red_puerto
enabled: true enabled: true
@@ -36,3 +46,8 @@ jobs:
target: target:
schema: Red schema: Red
table: PUERTO table: PUERTO
pre_sql:
- 'SELECT 1'
# - 'TRUNCATE TABLE "Red"."PUERTO"'
post_sql:
- "SELECT 1"

View File

@@ -8,9 +8,9 @@ import (
) )
type RetryConfig struct { type RetryConfig struct {
Attempts int `yaml:"attempts"` Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"` BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"` MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"` MaxJitterMs int `yaml:"max_jitter_ms"`
} }
@@ -22,6 +22,8 @@ type JobConfig struct {
BatchesPerPartition int `yaml:"batches_per_partition"` BatchesPerPartition int `yaml:"batches_per_partition"`
TruncateTarget bool `yaml:"truncate_target"` TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"` TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxChunkErrors int `yaml:"max_chunk_errors"`
Retry RetryConfig `yaml:"retry"` Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64 RowsPerPartition int64
} }
@@ -48,6 +50,12 @@ type Job struct {
PreSQL []string `yaml:"pre_sql"` PreSQL []string `yaml:"pre_sql"`
PostSQL []string `yaml:"post_sql"` PostSQL []string `yaml:"post_sql"`
JobConfig `yaml:",inline"` JobConfig `yaml:",inline"`
Range struct {
Min int64 `yaml:"min"`
Max int64 `yaml:"max"`
IsMinInclusive bool `yaml:"is_min_inclusive"`
IsMaxInclusive bool `yaml:"is_max_inclusive"`
}
} }
type MigrationConfig struct { type MigrationConfig struct {
@@ -74,6 +82,8 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.MaxParallelWorkers = raw.MaxParallelWorkers c.MaxParallelWorkers = raw.MaxParallelWorkers
c.Defaults = raw.Defaults c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition) c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs { for _, node := range raw.Jobs {

View File

@@ -1,13 +1,7 @@
package custom_errors package custom_errors
import ( import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
) )
type ExtractorError struct { type ExtractorError struct {
@@ -20,82 +14,3 @@ type ExtractorError struct {
func (e *ExtractorError) Error() string { func (e *ExtractorError) Error() string {
return e.Msg return e.Msg
} }
func ExtractorErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
chErrorsIn <-chan ExtractorError,
chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError,
wgActivePartitions *sync.WaitGroup,
) {
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Partition.RetryCounter >= retryConfig.Attempts {
wgActivePartitions.Done()
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
newPartition := err.Partition
newPartition.RetryCounter++
delay := computeBackoffDelay(
newPartition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
if err.HasLastId {
newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New()
newPartition.LowerLimit = err.LastId
newPartition.IsLowerLimitInclusive = false
}
requeueWithBackoff(ctx, delay, func() {
select {
case chPartitionsOut <- newPartition:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -21,11 +21,14 @@ func (e *LoaderError) Error() string {
func LoaderErrorHandler( func LoaderErrorHandler(
ctx context.Context, ctx context.Context,
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
maxChunkErrors int,
chErrorsIn <-chan LoaderError, chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError, chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
) { ) {
definitiveErrors := 0
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
@@ -42,6 +45,7 @@ func LoaderErrorHandler(
if err.Batch.RetryCounter >= retryConfig.Attempts { if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done() wgActiveBatches.Done()
definitiveErrors++
jobError := JobError{ jobError := JobError{
ShouldCancelJob: false, ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts), Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
@@ -54,6 +58,20 @@ func LoaderErrorHandler(
return return
} }
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue continue
} else { } else {
jobError := JobError{ jobError := JobError{

View File

@@ -0,0 +1,19 @@
package dbwrapper
import "fmt"
type Factory func() DbWrapper
var drivers = make(map[string]Factory)
func Register(name string, factory Factory) {
drivers[name] = factory
}
func New(driverType string) (DbWrapper, error) {
factory, ok := drivers[driverType]
if !ok {
return nil, fmt.Errorf("driver not yet supported: %s", driverType)
}
return factory(), nil
}

View File

@@ -0,0 +1,176 @@
package dbwrapper
import (
"context"
"database/sql"
"fmt"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
})
}
type mssqlRowResult struct {
row *sql.Row
}
func (mr *mssqlRowResult) Scan(dest ...any) error {
return mr.row.Scan(dest...)
}
type mssqlRowsResult struct {
columns []string
rows *sql.Rows
}
func (mr *mssqlRowsResult) Close() error {
return mr.rows.Close()
}
func (mr *mssqlRowsResult) Columns() ([]string, error) {
if mr.columns != nil {
return mr.columns, nil
}
return mr.rows.Columns()
}
func (mr *mssqlRowsResult) Err() error {
return mr.rows.Err()
}
func (mr *mssqlRowsResult) Next() bool {
return mr.rows.Next()
}
func (mr *mssqlRowsResult) Scan(dest ...any) error {
return mr.rows.Scan(dest...)
}
func (mr *mssqlRowsResult) Values() ([]any, error) {
columns, err := mr.Columns()
if err != nil {
return nil, err
}
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := mr.rows.Scan(scanArgs...); err != nil {
return nil, err
}
return rowValues, nil
}
type mssqlDbWrapper struct {
db *sql.DB
dialect string
}
func (mw *mssqlDbWrapper) Connect(ctx context.Context, dbUrl string) error {
db, err := sql.Open("sqlserver", dbUrl)
if err != nil {
return err
}
if err := db.PingContext(ctx); err != nil {
if err := db.Close(); err != nil {
return err
}
return err
}
mw.db = db
return nil
}
func (mw *mssqlDbWrapper) Close() error {
return mw.db.Close()
}
func (mw *mssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, execErr := mw.db.ExecContext(ctx, query, args...)
if execErr != nil {
return ExecResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: affectedRows}, nil
}
func (mw *mssqlDbWrapper) GetDialect() string {
return mw.dialect
}
func (mw *mssqlDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := mw.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &mssqlRowsResult{columns: nil, rows: rows}, nil
}
func (mw *mssqlDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := mw.db.QueryRowContext(ctx, query, args...)
return &mssqlRowResult{row: row}
}
func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
tx, err := mw.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
fullTableName := fmt.Sprintf("[%s].[%s]", schema, table)
stmt, err := tx.PrepareContext(ctx, mssql.CopyIn(fullTableName, mssql.BulkOptions{}, columnNames...))
if err != nil {
tx.Rollback()
return 0, err
}
for _, row := range rows {
_, err = stmt.ExecContext(ctx, row...)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
}
result, err := stmt.ExecContext(ctx)
if err != nil {
stmt.Close()
tx.Rollback()
return 0, err
}
if err := stmt.Close(); err != nil {
tx.Rollback()
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
rowsAffected, raErr := result.RowsAffected()
if raErr != nil {
return 0, nil
}
return rowsAffected, nil
}

View File

@@ -0,0 +1,128 @@
package dbwrapper
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
})
}
type postgresRowResult struct {
row pgx.Row
}
func (pr *postgresRowResult) Scan(dest ...any) error {
return pr.row.Scan(dest...)
}
type postgresRowsResult struct {
columns []string
rows pgx.Rows
}
func (pr *postgresRowsResult) Close() error {
pr.rows.Close()
return nil
}
func (pr *postgresRowsResult) Columns() ([]string, error) {
if pr.columns != nil {
return pr.columns, nil
}
rawColumns := pr.rows.FieldDescriptions()
if rawColumns == nil {
return nil, errors.New("error retrieving columns")
}
columns := make([]string, 0, len(rawColumns))
for _, rc := range rawColumns {
columns = append(columns, rc.Name)
}
return columns, nil
}
func (pr *postgresRowsResult) Err() error {
return pr.rows.Err()
}
func (pr *postgresRowsResult) Next() bool {
return pr.rows.Next()
}
func (pr *postgresRowsResult) Scan(dest ...any) error {
return pr.rows.Scan(dest...)
}
func (pr *postgresRowsResult) Values() ([]any, error) {
return pr.rows.Values()
}
type postgresDbWrapper struct {
db *pgxpool.Pool
dialect string
}
func (pw *postgresDbWrapper) Connect(ctx context.Context, dbUrl string) error {
pool, err := pgxpool.New(ctx, dbUrl)
if err != nil {
return err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return err
}
pw.db = pool
return nil
}
func (pw *postgresDbWrapper) Close() error {
pw.db.Close()
return nil
}
func (pw *postgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (ExecResult, error) {
result, err := pw.db.Exec(ctx, query, args...)
if err != nil {
return ExecResult{}, err
}
return ExecResult{AffectedRows: result.RowsAffected()}, nil
}
func (pw *postgresDbWrapper) GetDialect() string {
return pw.dialect
}
func (pw *postgresDbWrapper) Query(ctx context.Context, query string, args ...any) (RowsResult, error) {
rows, err := pw.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRowsResult{columns: nil, rows: rows}, nil
}
func (pw *postgresDbWrapper) QueryRow(ctx context.Context, query string, args ...any) RowResult {
row := pw.db.QueryRow(ctx, query, args...)
return &postgresRowResult{row: row}
}
func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error) {
affectedRows, err := pw.db.CopyFrom(ctx, pgx.Identifier{schema, table}, columnNames, pgx.CopyFromRows(rows))
if err != nil {
return 0, err
}
return affectedRows, nil
}

View File

@@ -0,0 +1,35 @@
package dbwrapper
import (
"context"
"errors"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
type ExecResult struct {
AffectedRows int64
}
type RowsResult interface {
Close() error
Columns() ([]string, error)
Err() error
Next() bool
Scan(dest ...any) error
Values() ([]any, error)
}
type RowResult interface {
Scan(dest ...any) error
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
Exec(ctx context.Context, query string, args ...any) (ExecResult, error)
GetDialect() string
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
}

View File

@@ -1,28 +0,0 @@
package db
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}

View File

@@ -0,0 +1,92 @@
package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
func Consume(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := extractWithRetries(
ctx,
extractor,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
wgActivePartitions.Done()
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, rowsReadResult)
}
if err != nil {
var jobError *custom_errors.JobError
if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
}
}
}
}

View File

@@ -0,0 +1,70 @@
package extractors
import (
"context"
"errors"
"fmt"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func extractWithRetries(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
delay := time.Duration(time.Second * 1)
currentParitition := partition
for {
rowsRead, err := extractor.Exec(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
var exError *custom_errors.ExtractorError
if errors.As(err, &exError) {
currentParitition.RetryCounter++
if currentParitition.RetryCounter > 3 {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}

View File

@@ -0,0 +1,64 @@
package extractors
import (
"context"
"fmt"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func errorFromLastPartitionRow(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition models.Partition,
previousError error,
) error {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok {
currentPartition := partition
currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{
Partition: currentPartition,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return &custom_errors.ExtractorError{
Partition: partition,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}
func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error {
select {
case chBatchesOut <- batch:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func flush(
ctx context.Context,
partition *models.Partition,
batchSize int,
batchRows []models.UnknownRowValues,
chBatchesOut chan<- models.Batch,
) error {
if len(batchRows) == 0 {
return nil
}
batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
return sendBatch(ctx, chBatchesOut, batch)
}

View File

@@ -3,26 +3,20 @@ package extractors
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"fmt" "fmt"
"slices"
"strings" "strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
) )
type MssqlExtractor struct { type MssqlExtractor struct {
db *sql.DB db dbwrapper.DbWrapper
} }
func NewMssqlExtractor(db *sql.DB) etl.Extractor { func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &MssqlExtractor{db: db} return &MssqlExtractor{db: db}
} }
@@ -70,35 +64,7 @@ func buildExtractQueryMssql(
return sbQuery.String() return sbQuery.String()
} }
func errorFromLastRow( func (mssqlEx *MssqlExtractor) Exec(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition *models.Partition,
previousError error,
) *custom_errors.ExtractorError {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok {
currentPartition := *partition
currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{
Partition: currentPartition,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return &custom_errors.ExtractorError{
Partition: *partition,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}
func (mssqlEx *MssqlExtractor) ProcessPartition(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -106,172 +72,50 @@ func (mssqlEx *MssqlExtractor) ProcessPartition(
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, ) (int64, error) {
) error { query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive)
query := buildExtractQueryMssql(tableInfo, columns, partition.ShouldUseRange, partition.IsLowerLimitInclusive)
var queryArgs []any var queryArgs []any
if partition.ShouldUseRange { if partition.HasRange {
queryArgs = append(queryArgs, queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min), sql.Named("max", partition.Range.Max))
sql.Named("min", partition.LowerLimit),
sql.Named("max", partition.UpperLimit),
)
} }
rows, err := mssqlEx.db.QueryContext(ctx, query, queryArgs...) rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil { if err != nil {
return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return 0, err
} }
defer rows.Close() defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize) batchRows := make([]models.UnknownRowValues, 0, batchSize)
var rowsRead int64 = 0
for rows.Next() { for rows.Next() {
rowValues := make([]any, len(columns)) values, err := rows.Values()
scanArgs := make([]any, len(columns)) if err != nil {
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(batchRows) == 0 { if len(batchRows) == 0 {
return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, err
}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
} }
lastRow := batchRows[len(batchRows)-1] lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return nil
}
atomic.AddInt64(rowsRead, int64(len(batchRows)))
return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err)
} }
rowsRead++
batchRows = append(batchRows, rowValues) batchRows = append(batchRows, values)
if len(batchRows) >= batchSize { if len(batchRows) >= batchSize {
select { if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: return rowsRead, err
case <-ctx.Done():
return nil
} }
atomic.AddInt64(rowsRead, int64(len(batchRows)))
batchRows = make([]models.UnknownRowValues, 0, batchSize)
} }
} }
if err := rows.Err(); err != nil { if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
if errors.Is(err, ctx.Err()) { return rowsRead, err
return ctx.Err()
}
if len(batchRows) == 0 {
return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
lastRow := batchRows[len(batchRows)-1]
return errorFromLastRow(lastRow, indexPrimaryKey, &partition, err)
} }
if len(batchRows) > 0 { return rowsRead, rows.Err()
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return nil
}
atomic.AddInt64(rowsRead, int64(len(batchRows)))
}
return nil
}
func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
err := mssqlEx.ProcessPartition(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
rowsRead,
)
if err != nil {
var exError *custom_errors.ExtractorError
var jobError *custom_errors.JobError
if errors.As(err, &exError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
}
continue
}
wgActivePartitions.Done()
}
}
} }

View File

@@ -5,23 +5,21 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresExtractor struct { type PostgresExtractor struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor { func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &PostgresExtractor{db: pool} return &PostgresExtractor{db: db}
} }
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string { func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
@@ -52,7 +50,7 @@ func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []mo
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
} }
func (postgresEx *PostgresExtractor) ProcessPartition( func (postgresEx *PostgresExtractor) Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -60,17 +58,17 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, ) (int64, error) {
) error {
query := buildExtractQueryPostgres(tableInfo, columns) query := buildExtractQueryPostgres(tableInfo, columns)
if partition.ShouldUseRange { if partition.HasRange {
return errors.New("Batch config not yet supported") return 0, errors.New("Batch config not yet supported")
} }
var rowsRead int64 = 0
rows, err := postgresEx.db.Query(ctx, query) rows, err := postgresEx.db.Query(ctx, query)
if err != nil { if err != nil {
return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
} }
defer rows.Close() defer rows.Close()
@@ -79,8 +77,9 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
for rows.Next() { for rows.Next() {
values, err := rows.Values() values, err := rows.Values()
if err != nil { if err != nil {
return errors.New("Unexpected error reading rows from source") return rowsRead, errors.New("Unexpected error reading rows from source")
} }
rowsRead++
batchRows = append(batchRows, values) batchRows = append(batchRows, values)
@@ -88,41 +87,24 @@ func (postgresEx *PostgresExtractor) ProcessPartition(
select { select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return rowsRead, ctx.Err()
} }
atomic.AddInt64(rowsRead, int64(len(batchRows)))
batchRows = make([]models.UnknownRowValues, 0, batchSize) batchRows = make([]models.UnknownRowValues, 0, batchSize)
} }
} }
if err := rows.Err(); err != nil { if err := rows.Err(); err != nil {
return errors.New("Unexpected error reading rows from source") return rowsRead, errors.New("Unexpected error reading rows from source")
} }
if len(batchRows) > 0 { if len(batchRows) > 0 {
select { select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done(): case <-ctx.Done():
return nil return rowsRead, nil
} }
atomic.AddInt64(rowsRead, int64(len(batchRows)))
} }
return nil return rowsRead, nil
}
func (postgresEx *PostgresExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
} }

View File

@@ -1 +0,0 @@
package extractors

View File

@@ -9,19 +9,18 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresLoader struct { type PostgresLoader struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresLoader(pool *pgxpool.Pool) etl.Loader { func NewPostgresLoader(db dbwrapper.DbWrapper) etl.Loader {
return &PostgresLoader{db: pool} return &PostgresLoader{db: db}
} }
func mapSlice[T any, V any](input []T, mapper func(T) V) []V { func mapSlice[T any, V any](input []T, mapper func(T) V) []V {
@@ -40,12 +39,12 @@ func (postgresLd *PostgresLoader) ProcessBatch(
colNames []string, colNames []string,
batch models.Batch, batch models.Batch,
) (int, error) { ) (int, error) {
tableId := pgx.Identifier{tableInfo.Schema, tableInfo.Table} _, err := postgresLd.db.SaveMassive(
_, err := postgresLd.db.CopyFrom(
ctx, ctx,
tableId, tableInfo.Schema,
tableInfo.Table,
colNames, colNames,
pgx.CopyFromRows(batch.Rows), batch.Rows,
) )
if err != nil { if err != nil {
@@ -54,7 +53,7 @@ func (postgresLd *PostgresLoader) ProcessBatch(
if pgErr.Code == "23505" { if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{ return 0, &custom_errors.JobError{
ShouldCancelJob: true, ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s", tableId.Sanitize()), Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Prev: err, Prev: err,
} }
} }

View File

@@ -23,9 +23,9 @@ func PartitionRangeGenerator(
if rowsCount <= rowsPerPartition { if rowsCount <= rowsPerPartition {
return []models.Partition{{ return []models.Partition{{
Id: uuid.New(), Id: uuid.New(),
ShouldUseRange: false, HasRange: false,
RetryCounter: 0, RetryCounter: 0,
}}, nil }}, nil
} }

View File

@@ -8,16 +8,17 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid" "github.com/google/uuid"
) )
type MssqlTableAnalyzer struct { type MssqlTableAnalyzer struct {
db *sql.DB db dbwrapper.DbWrapper
} }
func NewMssqlTableAnalyzer(db *sql.DB) etl.TableAnalyzer { func NewMssqlTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &MssqlTableAnalyzer{db: db} return &MssqlTableAnalyzer{db: db}
} }
@@ -35,7 +36,7 @@ JOIN sys.types t ON c.user_type_id = t.user_type_id
LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id LEFT JOIN sys.types bt ON t.is_user_defined = 1 AND bt.user_type_id = t.system_type_id
JOIN sys.tables st ON c.object_id = st.object_id JOIN sys.tables st ON c.object_id = st.object_id
JOIN sys.schemas s ON st.schema_id = s.schema_id JOIN sys.schemas s ON st.schema_id = s.schema_id
WHERE s.name = @schema AND st.name = @table AND c.name NOT LIKE 'graph_id%' WHERE s.name = @schema AND st.name = @table AND (c.is_hidden = 0 OR (c.graph_type IS NOT NULL AND c.name LIKE '$%'))
ORDER BY c.column_id;` ORDER BY c.column_id;`
type rawColumnMssql struct { type rawColumnMssql struct {
@@ -142,7 +143,7 @@ func (ta *MssqlTableAnalyzer) QueryColumnTypes(
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second) localCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel() defer cancel()
rows, err := ta.db.QueryContext(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)) rows, err := ta.db.Query(localCtx, mssqlColumnMetadataQuery, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -183,11 +184,11 @@ JOIN sys.partitions p ON t.object_id = p.object_id
WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1) WHERE s.name = @schema AND t.name = @table AND p.index_id IN (0, 1)
GROUP BY t.name` GROUP BY t.name`
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
var rowsCount int64 var rowsCount int64
err := ta.db.QueryRowContext(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount) err := ta.db.QueryRow(ctxTimeout, query, sql.Named("schema", tableInfo.Schema), sql.Named("table", tableInfo.Table)).Scan(&rowsCount)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -215,10 +216,10 @@ ORDER BY batch_id`,
tableInfo.Schema, tableInfo.Schema,
tableInfo.Table) tableInfo.Table)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*20) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
rows, err := ta.db.QueryContext(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions)) rows, err := ta.db.Query(ctxTimeout, query, sql.Named("maxPartitions", maxPartitions))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -228,13 +229,15 @@ ORDER BY batch_id`,
for rows.Next() { for rows.Next() {
partition := models.Partition{ partition := models.Partition{
Id: uuid.New(), Id: uuid.New(),
ShouldUseRange: true, HasRange: true,
RetryCounter: 0, RetryCounter: 0,
IsLowerLimitInclusive: true, Range: models.PartitionRange{
IsMinInclusive: true,
},
} }
if err := rows.Scan(&partition.LowerLimit, &partition.UpperLimit); err != nil { if err := rows.Scan(&partition.Range.Min, &partition.Range.Max); err != nil {
return nil, err return nil, err
} }

View File

@@ -6,16 +6,16 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgxpool"
) )
type PostgresTableAnalyzer struct { type PostgresTableAnalyzer struct {
db *pgxpool.Pool db dbwrapper.DbWrapper
} }
func NewPostgresTableAnalyzer(db *pgxpool.Pool) etl.TableAnalyzer { func NewPostgresTableAnalyzer(db dbwrapper.DbWrapper) etl.TableAnalyzer {
return &PostgresTableAnalyzer{db: db} return &PostgresTableAnalyzer{db: db}
} }
@@ -125,7 +125,7 @@ func (ta *PostgresTableAnalyzer) QueryColumnTypes(
ctx context.Context, ctx context.Context,
tableInfo config.TableInfo, tableInfo config.TableInfo,
) ([]models.ColumnType, error) { ) ([]models.ColumnType, error) {
localCtx, cancel := context.WithTimeout(ctx, 20*time.Second) localCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel() defer cancel()
rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table) rows, err := ta.db.Query(localCtx, postgresColumnMetadataQuery, tableInfo.Schema, tableInfo.Table)

View File

@@ -74,6 +74,10 @@ func (mssqlTr *MssqlTransformer) ProcessBatch(
} }
} }
if rowValues == nil {
continue
}
for _, task := range transformationPlan { for _, task := range transformationPlan {
val := rowValues[task.Index] val := rowValues[task.Index]
if val == nil { if val == nil {

View File

@@ -10,7 +10,7 @@ import (
) )
type Extractor interface { type Extractor interface {
ProcessPartition( Exec(
ctx context.Context, ctx context.Context,
tableInfo config.SourceTableInfo, tableInfo config.SourceTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
@@ -18,21 +18,7 @@ type Extractor interface {
partition models.Partition, partition models.Partition,
indexPrimaryKey int, indexPrimaryKey int,
chBatchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
rowsRead *int64, ) (int64, error)
) error
Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
)
} }
type TransformerFunc func(any) (any, error) type TransformerFunc func(any) (any, error)

View File

@@ -11,12 +11,17 @@ type Batch struct {
RetryCounter int RetryCounter int
} }
type Partition struct { type PartitionRange struct {
Id uuid.UUID Min int64
ParentId uuid.UUID Max int64
LowerLimit int64 IsMinInclusive bool
UpperLimit int64 IsMaxInclusive bool
IsLowerLimitInclusive bool }
ShouldUseRange bool
RetryCounter int type Partition struct {
Id uuid.UUID
ParentId uuid.UUID
Range PartitionRange
HasRange bool
RetryCounter int
} }

View File

@@ -8,13 +8,32 @@ import (
"time" "time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
func Connect(ctx context.Context, dbURL string) (*pgxpool.Pool, error) {
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
return nil, fmt.Errorf("unable to connect to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("unable to ping database: %w", err)
}
return pool, nil
}
func Close(pool *pgxpool.Pool) {
if pool != nil {
pool.Close()
}
}
func main() { func main() {
log.SetFormatter(&log.TextFormatter{ log.SetFormatter(&log.TextFormatter{
FullTimestamp: true, FullTimestamp: true,
@@ -27,8 +46,8 @@ func main() {
ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxSource, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
sourcePool, err := db.Connect(ctxSource, config.App.SourceDbUrl) sourcePool, err := Connect(ctxSource, config.App.SourceDbUrl)
defer db.Close(sourcePool) defer Close(sourcePool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -37,8 +56,8 @@ func main() {
ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second) ctxTarget, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel() defer cancel()
targetPool, err := db.Connect(ctxTarget, config.App.TargetDbUrl) targetPool, err := Connect(ctxTarget, config.App.TargetDbUrl)
defer db.Close(targetPool) defer Close(targetPool)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }