2 Commits

10 changed files with 145 additions and 15 deletions

View File

@@ -13,5 +13,5 @@ func configureLog() {
DisableSorting: false,
PadLevelText: true,
})
log.SetLevel(log.InfoLevel)
log.SetLevel(log.DebugLevel)
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
@@ -49,8 +50,10 @@ func main() {
status := "OK"
if res.Error != nil {
status = "FAILED"
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v | Error: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration, res.Error)
} else {
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
}
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
totalProcessed += res.RowsLoaded
if res.Error != nil {
@@ -91,6 +94,7 @@ func processMigrationJobs(
chJobs := make(chan config.Job, len(jobs))
var wgJobs sync.WaitGroup
targetDbWrapper := db.NewPostgresDbWrapper(targetDb)
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
@@ -103,6 +107,7 @@ func processMigrationJobs(
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
res := processMigrationJob(
ctx,
targetDbWrapper,
sourceTableAnalyzer,
targetTableAnalyzer,
extractor,

View File

@@ -8,6 +8,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
@@ -17,6 +18,8 @@ import (
func processMigrationJob(
ctx context.Context,
// sourceDbWrapper db.DbWrapper,
targetDbWrapper db.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor,
@@ -24,7 +27,7 @@ func processMigrationJob(
loader etl.Loader,
job config.Job,
) JobResult {
jobCtx, cancel := context.WithCancel(ctx)
localCtx, cancel := context.WithCancel(ctx)
defer cancel()
result := JobResult{
@@ -39,7 +42,7 @@ func processMigrationJob(
wgQueryColumnTypes.Go(func() error {
var err error
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(jobCtx, job.SourceTable.TableInfo)
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo)
if err != nil {
return err
}
@@ -49,7 +52,7 @@ func processMigrationJob(
wgQueryColumnTypes.Go(func() error {
var err error
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(jobCtx, job.TargetTable.TableInfo)
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo)
if err != nil {
return err
}
@@ -63,8 +66,15 @@ func processMigrationJob(
return result
}
for _, query := range job.PreSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
partitions, err := table_analyzers.PartitionRangeGenerator(
jobCtx,
localCtx,
sourceTableAnalyzer,
job.SourceTable.TableInfo,
job.SourceTable.PrimaryKey,
@@ -88,7 +98,7 @@ func processMigrationJob(
var wgLoaders sync.WaitGroup
go func() {
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
cancel()
result.Error = err
@@ -96,16 +106,18 @@ func processMigrationJob(
}()
go custom_errors.ExtractorErrorHandler(
jobCtx,
localCtx,
job.Retry,
job.MaxPartitionErrrors,
chExtractorErrors,
chPartitions,
chJobErrors,
&wgActivePartitions,
)
go custom_errors.LoaderErrorHandler(
jobCtx,
localCtx,
job.Retry,
job.MaxChunkErrors,
chLoadersErrors,
chBatchesTransformed,
chJobErrors,
@@ -118,7 +130,7 @@ func processMigrationJob(
for range maxExtractors {
wgExtractors.Go(func() {
extractor.Exec(
jobCtx,
localCtx,
job.SourceTable,
sourceColTypes,
job.BatchSize,
@@ -144,7 +156,7 @@ func processMigrationJob(
for range maxExtractors {
wgTransformers.Go(func() {
transformer.Exec(
jobCtx,
localCtx,
sourceColTypes,
chBatchesRaw,
chBatchesTransformed,
@@ -159,7 +171,7 @@ func processMigrationJob(
for range job.MaxLoaders {
wgLoaders.Go(func() {
loader.Exec(
jobCtx,
localCtx,
job.TargetTable,
targetColTypes,
chBatchesTransformed,
@@ -202,8 +214,15 @@ func processMigrationJob(
cancel()
}()
for _, query := range job.PostSQL {
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
result.Error = err
return result
}
}
log.Debugf("waiting for local context to be done (%v)", job.Name)
<-jobCtx.Done()
<-localCtx.Done()
log.Debugf("local context done (%v)", job.Name)
if ctx.Err() != nil {

View File

@@ -10,6 +10,8 @@ defaults:
batches_per_partition: 8
truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_chunk_errors: 5
retry:
attempts: 3
base_delay_ms: 500
@@ -26,6 +28,8 @@ jobs:
target:
schema: Cartografia
table: MANZANA
pre_sql:
- 'SELECT 1'
- name: red_puerto
enabled: true
@@ -36,3 +40,7 @@ jobs:
target:
schema: Red
table: PUERTO
pre_sql:
- 'SELECT 1'
post_sql:
- "SELECT 1"

View File

@@ -8,9 +8,9 @@ import (
)
type RetryConfig struct {
Attempts int `yaml:"attempts"`
Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"`
}
@@ -22,6 +22,8 @@ type JobConfig struct {
BatchesPerPartition int `yaml:"batches_per_partition"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxChunkErrors int `yaml:"max_chunk_errors"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
}

View File

@@ -24,11 +24,14 @@ func (e *ExtractorError) Error() string {
func ExtractorErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxPartitionErrors int,
chErrorsIn <-chan ExtractorError,
chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError,
wgActivePartitions *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
@@ -45,6 +48,7 @@ func ExtractorErrorHandler(
if err.Partition.RetryCounter >= retryConfig.Attempts {
wgActivePartitions.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
@@ -57,6 +61,20 @@ func ExtractorErrorHandler(
return
}
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{

View File

@@ -21,11 +21,14 @@ func (e *LoaderError) Error() string {
func LoaderErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxChunkErrors int,
chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
@@ -42,6 +45,7 @@ func LoaderErrorHandler(
if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
@@ -54,6 +58,20 @@ func LoaderErrorHandler(
return
}
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{

30
internal/app/db/mssql.go Normal file
View File

@@ -0,0 +1,30 @@
package db
import (
"context"
"database/sql"
)
type MssqlDbWrapper struct {
db *sql.DB
}
func NewMssqlDbWrapper(db *sql.DB) DbWrapper {
return &MssqlDbWrapper{db: db}
}
func (wrapper *MssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, execErr := wrapper.db.ExecContext(ctx, query, args...)
if execErr != nil {
return DbWrapperResult{}, execErr
}
affectedRows, err := result.RowsAffected()
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: affectedRows,
}, nil
}

View File

@@ -26,3 +26,22 @@ func Close(pool *pgxpool.Pool) {
pool.Close()
}
}
type PostgresDbWrapper struct {
db *pgxpool.Pool
}
func NewPostgresDbWrapper(db *pgxpool.Pool) DbWrapper {
return &PostgresDbWrapper{db: db}
}
func (wrapper *PostgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
result, err := wrapper.db.Exec(ctx, query, args...)
if err != nil {
return DbWrapperResult{}, err
}
return DbWrapperResult{
AffectedRows: result.RowsAffected(),
}, nil
}

11
internal/app/db/types.go Normal file
View File

@@ -0,0 +1,11 @@
package db
import "context"
type DbWrapperResult struct {
AffectedRows int64
}
type DbWrapper interface {
Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error)
}