Compare commits
2 Commits
f126d5bbd0
...
33c9cd9c3e
| Author | SHA1 | Date | |
|---|---|---|---|
|
33c9cd9c3e
|
|||
|
85074da2ec
|
@@ -13,5 +13,5 @@ func configureLog() {
|
|||||||
DisableSorting: false,
|
DisableSorting: false,
|
||||||
PadLevelText: true,
|
PadLevelText: true,
|
||||||
})
|
})
|
||||||
log.SetLevel(log.InfoLevel)
|
log.SetLevel(log.DebugLevel)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
||||||
@@ -49,8 +50,10 @@ func main() {
|
|||||||
status := "OK"
|
status := "OK"
|
||||||
if res.Error != nil {
|
if res.Error != nil {
|
||||||
status = "FAILED"
|
status = "FAILED"
|
||||||
}
|
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v | Error: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration, res.Error)
|
||||||
|
} else {
|
||||||
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
|
log.Infof("[%s] Status: %s | Read: %d | Loaded: %d | Errors: %d | Time: %v", res.JobName, status, res.RowsRead, res.RowsLoaded, res.RowsFailed, res.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
totalProcessed += res.RowsLoaded
|
totalProcessed += res.RowsLoaded
|
||||||
if res.Error != nil {
|
if res.Error != nil {
|
||||||
@@ -91,6 +94,7 @@ func processMigrationJobs(
|
|||||||
chJobs := make(chan config.Job, len(jobs))
|
chJobs := make(chan config.Job, len(jobs))
|
||||||
var wgJobs sync.WaitGroup
|
var wgJobs sync.WaitGroup
|
||||||
|
|
||||||
|
targetDbWrapper := db.NewPostgresDbWrapper(targetDb)
|
||||||
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
|
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
|
||||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
||||||
extractor := extractors.NewMssqlExtractor(sourceDb)
|
extractor := extractors.NewMssqlExtractor(sourceDb)
|
||||||
@@ -103,6 +107,7 @@ func processMigrationJobs(
|
|||||||
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
|
log.Infof("[worker %d] >>> Processing job: %s.%s <<<", i, job.SourceTable.Schema, job.SourceTable.Table)
|
||||||
res := processMigrationJob(
|
res := processMigrationJob(
|
||||||
ctx,
|
ctx,
|
||||||
|
targetDbWrapper,
|
||||||
sourceTableAnalyzer,
|
sourceTableAnalyzer,
|
||||||
targetTableAnalyzer,
|
targetTableAnalyzer,
|
||||||
extractor,
|
extractor,
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
@@ -17,6 +18,8 @@ import (
|
|||||||
|
|
||||||
func processMigrationJob(
|
func processMigrationJob(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
// sourceDbWrapper db.DbWrapper,
|
||||||
|
targetDbWrapper db.DbWrapper,
|
||||||
sourceTableAnalyzer etl.TableAnalyzer,
|
sourceTableAnalyzer etl.TableAnalyzer,
|
||||||
targetTableAnalyzer etl.TableAnalyzer,
|
targetTableAnalyzer etl.TableAnalyzer,
|
||||||
extractor etl.Extractor,
|
extractor etl.Extractor,
|
||||||
@@ -24,7 +27,7 @@ func processMigrationJob(
|
|||||||
loader etl.Loader,
|
loader etl.Loader,
|
||||||
job config.Job,
|
job config.Job,
|
||||||
) JobResult {
|
) JobResult {
|
||||||
jobCtx, cancel := context.WithCancel(ctx)
|
localCtx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
result := JobResult{
|
result := JobResult{
|
||||||
@@ -39,7 +42,7 @@ func processMigrationJob(
|
|||||||
|
|
||||||
wgQueryColumnTypes.Go(func() error {
|
wgQueryColumnTypes.Go(func() error {
|
||||||
var err error
|
var err error
|
||||||
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(jobCtx, job.SourceTable.TableInfo)
|
sourceColTypes, err = sourceTableAnalyzer.QueryColumnTypes(localCtx, job.SourceTable.TableInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -49,7 +52,7 @@ func processMigrationJob(
|
|||||||
|
|
||||||
wgQueryColumnTypes.Go(func() error {
|
wgQueryColumnTypes.Go(func() error {
|
||||||
var err error
|
var err error
|
||||||
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(jobCtx, job.TargetTable.TableInfo)
|
targetColTypes, err = targetTableAnalyzer.QueryColumnTypes(localCtx, job.TargetTable.TableInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -63,8 +66,15 @@ func processMigrationJob(
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, query := range job.PreSQL {
|
||||||
|
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
|
||||||
|
result.Error = err
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
partitions, err := table_analyzers.PartitionRangeGenerator(
|
partitions, err := table_analyzers.PartitionRangeGenerator(
|
||||||
jobCtx,
|
localCtx,
|
||||||
sourceTableAnalyzer,
|
sourceTableAnalyzer,
|
||||||
job.SourceTable.TableInfo,
|
job.SourceTable.TableInfo,
|
||||||
job.SourceTable.PrimaryKey,
|
job.SourceTable.PrimaryKey,
|
||||||
@@ -88,7 +98,7 @@ func processMigrationJob(
|
|||||||
var wgLoaders sync.WaitGroup
|
var wgLoaders sync.WaitGroup
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := custom_errors.JobErrorHandler(jobCtx, chJobErrors); err != nil {
|
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
|
||||||
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
|
log.Error("Fatal error received from JobErrorHandler, canceling job... - ", err)
|
||||||
cancel()
|
cancel()
|
||||||
result.Error = err
|
result.Error = err
|
||||||
@@ -96,16 +106,18 @@ func processMigrationJob(
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
go custom_errors.ExtractorErrorHandler(
|
go custom_errors.ExtractorErrorHandler(
|
||||||
jobCtx,
|
localCtx,
|
||||||
job.Retry,
|
job.Retry,
|
||||||
|
job.MaxPartitionErrrors,
|
||||||
chExtractorErrors,
|
chExtractorErrors,
|
||||||
chPartitions,
|
chPartitions,
|
||||||
chJobErrors,
|
chJobErrors,
|
||||||
&wgActivePartitions,
|
&wgActivePartitions,
|
||||||
)
|
)
|
||||||
go custom_errors.LoaderErrorHandler(
|
go custom_errors.LoaderErrorHandler(
|
||||||
jobCtx,
|
localCtx,
|
||||||
job.Retry,
|
job.Retry,
|
||||||
|
job.MaxChunkErrors,
|
||||||
chLoadersErrors,
|
chLoadersErrors,
|
||||||
chBatchesTransformed,
|
chBatchesTransformed,
|
||||||
chJobErrors,
|
chJobErrors,
|
||||||
@@ -118,7 +130,7 @@ func processMigrationJob(
|
|||||||
for range maxExtractors {
|
for range maxExtractors {
|
||||||
wgExtractors.Go(func() {
|
wgExtractors.Go(func() {
|
||||||
extractor.Exec(
|
extractor.Exec(
|
||||||
jobCtx,
|
localCtx,
|
||||||
job.SourceTable,
|
job.SourceTable,
|
||||||
sourceColTypes,
|
sourceColTypes,
|
||||||
job.BatchSize,
|
job.BatchSize,
|
||||||
@@ -144,7 +156,7 @@ func processMigrationJob(
|
|||||||
for range maxExtractors {
|
for range maxExtractors {
|
||||||
wgTransformers.Go(func() {
|
wgTransformers.Go(func() {
|
||||||
transformer.Exec(
|
transformer.Exec(
|
||||||
jobCtx,
|
localCtx,
|
||||||
sourceColTypes,
|
sourceColTypes,
|
||||||
chBatchesRaw,
|
chBatchesRaw,
|
||||||
chBatchesTransformed,
|
chBatchesTransformed,
|
||||||
@@ -159,7 +171,7 @@ func processMigrationJob(
|
|||||||
for range job.MaxLoaders {
|
for range job.MaxLoaders {
|
||||||
wgLoaders.Go(func() {
|
wgLoaders.Go(func() {
|
||||||
loader.Exec(
|
loader.Exec(
|
||||||
jobCtx,
|
localCtx,
|
||||||
job.TargetTable,
|
job.TargetTable,
|
||||||
targetColTypes,
|
targetColTypes,
|
||||||
chBatchesTransformed,
|
chBatchesTransformed,
|
||||||
@@ -202,8 +214,15 @@ func processMigrationJob(
|
|||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
for _, query := range job.PostSQL {
|
||||||
|
if _, err := targetDbWrapper.Exec(localCtx, query); err != nil {
|
||||||
|
result.Error = err
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Debugf("waiting for local context to be done (%v)", job.Name)
|
log.Debugf("waiting for local context to be done (%v)", job.Name)
|
||||||
<-jobCtx.Done()
|
<-localCtx.Done()
|
||||||
log.Debugf("local context done (%v)", job.Name)
|
log.Debugf("local context done (%v)", job.Name)
|
||||||
|
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ defaults:
|
|||||||
batches_per_partition: 8
|
batches_per_partition: 8
|
||||||
truncate_target: true
|
truncate_target: true
|
||||||
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
||||||
|
max_partition_errrors: 5
|
||||||
|
max_chunk_errors: 5
|
||||||
retry:
|
retry:
|
||||||
attempts: 3
|
attempts: 3
|
||||||
base_delay_ms: 500
|
base_delay_ms: 500
|
||||||
@@ -26,6 +28,8 @@ jobs:
|
|||||||
target:
|
target:
|
||||||
schema: Cartografia
|
schema: Cartografia
|
||||||
table: MANZANA
|
table: MANZANA
|
||||||
|
pre_sql:
|
||||||
|
- 'SELECT 1'
|
||||||
|
|
||||||
- name: red_puerto
|
- name: red_puerto
|
||||||
enabled: true
|
enabled: true
|
||||||
@@ -36,3 +40,7 @@ jobs:
|
|||||||
target:
|
target:
|
||||||
schema: Red
|
schema: Red
|
||||||
table: PUERTO
|
table: PUERTO
|
||||||
|
pre_sql:
|
||||||
|
- 'SELECT 1'
|
||||||
|
post_sql:
|
||||||
|
- "SELECT 1"
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ type JobConfig struct {
|
|||||||
BatchesPerPartition int `yaml:"batches_per_partition"`
|
BatchesPerPartition int `yaml:"batches_per_partition"`
|
||||||
TruncateTarget bool `yaml:"truncate_target"`
|
TruncateTarget bool `yaml:"truncate_target"`
|
||||||
TruncateMethod string `yaml:"truncate_method"`
|
TruncateMethod string `yaml:"truncate_method"`
|
||||||
|
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
|
||||||
|
MaxChunkErrors int `yaml:"max_chunk_errors"`
|
||||||
Retry RetryConfig `yaml:"retry"`
|
Retry RetryConfig `yaml:"retry"`
|
||||||
RowsPerPartition int64
|
RowsPerPartition int64
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,11 +24,14 @@ func (e *ExtractorError) Error() string {
|
|||||||
func ExtractorErrorHandler(
|
func ExtractorErrorHandler(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
retryConfig config.RetryConfig,
|
retryConfig config.RetryConfig,
|
||||||
|
maxPartitionErrors int,
|
||||||
chErrorsIn <-chan ExtractorError,
|
chErrorsIn <-chan ExtractorError,
|
||||||
chPartitionsOut chan<- models.Partition,
|
chPartitionsOut chan<- models.Partition,
|
||||||
chJobErrorsOut chan<- JobError,
|
chJobErrorsOut chan<- JobError,
|
||||||
wgActivePartitions *sync.WaitGroup,
|
wgActivePartitions *sync.WaitGroup,
|
||||||
) {
|
) {
|
||||||
|
definitiveErrors := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return
|
return
|
||||||
@@ -45,6 +48,7 @@ func ExtractorErrorHandler(
|
|||||||
|
|
||||||
if err.Partition.RetryCounter >= retryConfig.Attempts {
|
if err.Partition.RetryCounter >= retryConfig.Attempts {
|
||||||
wgActivePartitions.Done()
|
wgActivePartitions.Done()
|
||||||
|
definitiveErrors++
|
||||||
jobError := JobError{
|
jobError := JobError{
|
||||||
ShouldCancelJob: false,
|
ShouldCancelJob: false,
|
||||||
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
|
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
|
||||||
@@ -57,6 +61,20 @@ func ExtractorErrorHandler(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
|
||||||
|
fatalError := JobError{
|
||||||
|
ShouldCancelJob: true,
|
||||||
|
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
|
||||||
|
Prev: &err,
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case chJobErrorsOut <- fatalError:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
jobError := JobError{
|
jobError := JobError{
|
||||||
|
|||||||
@@ -21,11 +21,14 @@ func (e *LoaderError) Error() string {
|
|||||||
func LoaderErrorHandler(
|
func LoaderErrorHandler(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
retryConfig config.RetryConfig,
|
retryConfig config.RetryConfig,
|
||||||
|
maxChunkErrors int,
|
||||||
chErrorsIn <-chan LoaderError,
|
chErrorsIn <-chan LoaderError,
|
||||||
chBatchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
chJobErrorsOut chan<- JobError,
|
chJobErrorsOut chan<- JobError,
|
||||||
wgActiveBatches *sync.WaitGroup,
|
wgActiveBatches *sync.WaitGroup,
|
||||||
) {
|
) {
|
||||||
|
definitiveErrors := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return
|
return
|
||||||
@@ -42,6 +45,7 @@ func LoaderErrorHandler(
|
|||||||
|
|
||||||
if err.Batch.RetryCounter >= retryConfig.Attempts {
|
if err.Batch.RetryCounter >= retryConfig.Attempts {
|
||||||
wgActiveBatches.Done()
|
wgActiveBatches.Done()
|
||||||
|
definitiveErrors++
|
||||||
jobError := JobError{
|
jobError := JobError{
|
||||||
ShouldCancelJob: false,
|
ShouldCancelJob: false,
|
||||||
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
|
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
|
||||||
@@ -54,6 +58,20 @@ func LoaderErrorHandler(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
|
||||||
|
fatalError := JobError{
|
||||||
|
ShouldCancelJob: true,
|
||||||
|
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
|
||||||
|
Prev: &err,
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case chJobErrorsOut <- fatalError:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
jobError := JobError{
|
jobError := JobError{
|
||||||
|
|||||||
30
internal/app/db/mssql.go
Normal file
30
internal/app/db/mssql.go
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MssqlDbWrapper struct {
|
||||||
|
db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMssqlDbWrapper(db *sql.DB) DbWrapper {
|
||||||
|
return &MssqlDbWrapper{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wrapper *MssqlDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
|
||||||
|
result, execErr := wrapper.db.ExecContext(ctx, query, args...)
|
||||||
|
if execErr != nil {
|
||||||
|
return DbWrapperResult{}, execErr
|
||||||
|
}
|
||||||
|
|
||||||
|
affectedRows, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return DbWrapperResult{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return DbWrapperResult{
|
||||||
|
AffectedRows: affectedRows,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
@@ -26,3 +26,22 @@ func Close(pool *pgxpool.Pool) {
|
|||||||
pool.Close()
|
pool.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PostgresDbWrapper struct {
|
||||||
|
db *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPostgresDbWrapper(db *pgxpool.Pool) DbWrapper {
|
||||||
|
return &PostgresDbWrapper{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wrapper *PostgresDbWrapper) Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error) {
|
||||||
|
result, err := wrapper.db.Exec(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
return DbWrapperResult{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return DbWrapperResult{
|
||||||
|
AffectedRows: result.RowsAffected(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|||||||
11
internal/app/db/types.go
Normal file
11
internal/app/db/types.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type DbWrapperResult struct {
|
||||||
|
AffectedRows int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type DbWrapper interface {
|
||||||
|
Exec(ctx context.Context, query string, args ...any) (DbWrapperResult, error)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user