Compare commits
2 Commits
d32d4df6e4
...
e158986947
| Author | SHA1 | Date | |
|---|---|---|---|
|
e158986947
|
|||
|
bc6f9a6a70
|
@@ -19,10 +19,15 @@ func (e *ExtractorError) Error() string {
|
|||||||
|
|
||||||
const maxRetryAttempts = 3
|
const maxRetryAttempts = 3
|
||||||
|
|
||||||
func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chGlobalErrorsOut chan<- error) {
|
func extractorErrorHandler(chErrorsIn <-chan ExtractorError, chBatchesOut chan<- Batch, chJobErrorsOut chan<- JobError) {
|
||||||
for err := range chErrorsIn {
|
for err := range chErrorsIn {
|
||||||
if err.RetryCounter >= maxRetryAttempts {
|
if err.RetryCounter >= maxRetryAttempts {
|
||||||
chGlobalErrorsOut <- fmt.Errorf("batch %v reached max retries (%d): %s", err.Id, maxRetryAttempts, err.Msg)
|
jobError := JobError{
|
||||||
|
ShouldCancelJob: false,
|
||||||
|
Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts),
|
||||||
|
Prev: &err,
|
||||||
|
}
|
||||||
|
chJobErrorsOut <- jobError
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,20 +23,18 @@ func extractFromMssql(
|
|||||||
chBatchesIn <-chan Batch,
|
chBatchesIn <-chan Batch,
|
||||||
chChunksOut chan<- []UnknownRowValues,
|
chChunksOut chan<- []UnknownRowValues,
|
||||||
chErrorsOut chan<- ExtractorError,
|
chErrorsOut chan<- ExtractorError,
|
||||||
|
chJobErrorsOut chan<- JobError,
|
||||||
) {
|
) {
|
||||||
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
|
indexPrimaryKey := slices.IndexFunc(columns, func(col ColumnType) bool {
|
||||||
return strings.EqualFold(col.name, job.PrimaryKey)
|
return strings.EqualFold(col.name, job.PrimaryKey)
|
||||||
})
|
})
|
||||||
|
|
||||||
if indexPrimaryKey == -1 {
|
if indexPrimaryKey == -1 {
|
||||||
exError := ExtractorError{
|
exError := JobError{
|
||||||
Batch: Batch{
|
ShouldCancelJob: true,
|
||||||
RetryCounter: maxRetryAttempts,
|
Msg: "Primary key not found in provided columns",
|
||||||
},
|
|
||||||
HasLastId: false,
|
|
||||||
Msg: "Primary key not found in columns provided",
|
|
||||||
}
|
}
|
||||||
chErrorsOut <- exError
|
chJobErrorsOut <- exError
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -91,6 +89,7 @@ func extractFromMssql(
|
|||||||
}
|
}
|
||||||
|
|
||||||
lastRow := rowsChunk[len(rowsChunk)-1]
|
lastRow := rowsChunk[len(rowsChunk)-1]
|
||||||
|
chChunksOut <- rowsChunk
|
||||||
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
|
chErrorsOut <- ExtractorErrorFromLastRowMssql(lastRow, indexPrimaryKey, &batch, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
33
cmd/go_migrate/job-error-handler.go
Normal file
33
cmd/go_migrate/job-error-handler.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobError struct {
|
||||||
|
ShouldCancelJob bool
|
||||||
|
Msg string
|
||||||
|
Prev error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *JobError) Error() string {
|
||||||
|
if e.Prev != nil {
|
||||||
|
return fmt.Sprintf("%s: %v", e.Msg, e.Prev)
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.Msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func jobErrorHandler(chErrorsIn <-chan JobError) error {
|
||||||
|
for err := range chErrorsIn {
|
||||||
|
if err.ShouldCancelJob {
|
||||||
|
return &err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -24,18 +24,31 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
|
|||||||
logColumnTypes(sourceColTypes, "Source col types")
|
logColumnTypes(sourceColTypes, "Source col types")
|
||||||
logColumnTypes(targetColTypes, "Target col types")
|
logColumnTypes(targetColTypes, "Target col types")
|
||||||
|
|
||||||
mssqlCtx := context.Background()
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
batches, err := batchGeneratorMssql(mssqlCtx, sourceDb, job)
|
defer cancel()
|
||||||
|
|
||||||
|
batches, err := batchGeneratorMssql(ctx, sourceDb, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unexpected error calculating batch ranges: ", err)
|
log.Error("Unexpected error calculating batch ranges: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
chGlobalErrors := make(chan error)
|
chJobErrors := make(chan JobError)
|
||||||
defer close(chGlobalErrors)
|
defer close(chJobErrors)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := jobErrorHandler(chJobErrors); err != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
chBatches := make(chan Batch, len(batches))
|
chBatches := make(chan Batch, len(batches))
|
||||||
chChunks := make(chan []UnknownRowValues, QueueSize)
|
|
||||||
chExtractorErrors := make(chan ExtractorError, len(batches))
|
chExtractorErrors := make(chan ExtractorError, len(batches))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
extractorErrorHandler(chExtractorErrors, chBatches, chJobErrors)
|
||||||
|
}()
|
||||||
|
|
||||||
|
chChunks := make(chan []UnknownRowValues, QueueSize)
|
||||||
maxExtractors := min(NumExtractors, len(batches))
|
maxExtractors := min(NumExtractors, len(batches))
|
||||||
var wgMssqlExtractors sync.WaitGroup
|
var wgMssqlExtractors sync.WaitGroup
|
||||||
|
|
||||||
@@ -43,7 +56,7 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
|
|||||||
extractStartTime := time.Now()
|
extractStartTime := time.Now()
|
||||||
for range maxExtractors {
|
for range maxExtractors {
|
||||||
wgMssqlExtractors.Go(func() {
|
wgMssqlExtractors.Go(func() {
|
||||||
extractFromMssql(mssqlCtx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors)
|
extractFromMssql(ctx, sourceDb, job, sourceColTypes, ChunkSize, chBatches, chChunks, chExtractorErrors, chJobErrors)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -55,10 +68,6 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
|
|||||||
close(chExtractorErrors)
|
close(chExtractorErrors)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
|
||||||
extractorErrorHandler(chExtractorErrors, chBatches, chGlobalErrors)
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wgMssqlExtractors.Wait()
|
wgMssqlExtractors.Wait()
|
||||||
close(chChunks)
|
close(chChunks)
|
||||||
@@ -83,17 +92,15 @@ func processMigrationJob(sourceDb *sql.DB, targetDb *pgxpool.Pool, job Migration
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
var wgPostgresLoaders sync.WaitGroup
|
var wgPostgresLoaders sync.WaitGroup
|
||||||
postgresLoaderCtx := context.Background()
|
|
||||||
|
|
||||||
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
|
log.Infof("Starting %d PostgreSQL loader(s)...", NumLoaders)
|
||||||
loaderStartTime := time.Now()
|
loaderStartTime := time.Now()
|
||||||
|
|
||||||
for range NumLoaders {
|
for range NumLoaders {
|
||||||
wgPostgresLoaders.Go(func() {
|
wgPostgresLoaders.Go(func() {
|
||||||
if err := loadRowsPostgres(postgresLoaderCtx, job, targetColTypes, targetDb, chRowsTransform); err != nil {
|
if err := loadRowsPostgres(ctx, job, targetColTypes, targetDb, chRowsTransform); err != nil {
|
||||||
log.Error("Unexpected error loading data into postgres: ", err)
|
log.Error("Unexpected error loading data into postgres: ", err)
|
||||||
}
|
}
|
||||||
// fakeLoader(job, sourceColTypes, chRowsTransform)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user