diff --git a/cmd/go_migrate/batch-generator.go b/cmd/go_migrate/batch-generator.go index 90791e9..cbba562 100644 --- a/cmd/go_migrate/batch-generator.go +++ b/cmd/go_migrate/batch-generator.go @@ -11,16 +11,6 @@ import ( "github.com/google/uuid" ) -type Batch struct { - Id uuid.UUID - ParentId uuid.UUID - LowerLimit int64 - UpperLimit int64 - IsLowerLimitInclusive bool - ShouldUseRange bool - RetryCounter int -} - func estimateTotalRowsMssql(ctx context.Context, db *sql.DB, tableInfo config.SourceTableInfo) (int64, error) { query := ` SELECT diff --git a/cmd/go_migrate/extractor-error-handler.go b/cmd/go_migrate/extractor-error-handler.go deleted file mode 100644 index 86cffa4..0000000 --- a/cmd/go_migrate/extractor-error-handler.go +++ /dev/null @@ -1,103 +0,0 @@ -package main - -import ( - "context" - "fmt" - "sync" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/google/uuid" -) - -type ExtractorError struct { - Batch - LastId int64 - HasLastId bool - Msg string -} - -func (e *ExtractorError) Error() string { - return e.Msg -} - -const maxRetryAttempts = 3 - -func extractorErrorHandler( - ctx context.Context, - chErrorsIn <-chan ExtractorError, - chBatchesOut chan<- Batch, - chJobErrorsOut chan<- JobError, - wgActiveBatches *sync.WaitGroup, -) { - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - - case err, ok := <-chErrorsIn: - if !ok { - return - } - - if err.RetryCounter >= maxRetryAttempts { - jobError := JobError{ - ShouldCancelJob: false, - Msg: fmt.Sprintf("batch %v reached max retries (%d)", err.Id, maxRetryAttempts), - Prev: &err, - } - - select { - case chJobErrorsOut <- jobError: - case <-ctx.Done(): - return - } - - wgActiveBatches.Done() - continue - } - - newBatch := err.Batch - newBatch.RetryCounter++ - - if err.HasLastId { - newBatch.ParentId = err.Id - newBatch.Id = uuid.New() - newBatch.LowerLimit = err.LastId - newBatch.IsLowerLimitInclusive = false - } - - select { - case chBatchesOut <- newBatch: - case <-ctx.Done(): - return - } - } - } -} - -func ExtractorErrorFromLastRowMssql(lastRow models.UnknownRowValues, indexPrimaryKey int, batch *Batch, previousError error) ExtractorError { - lastIdRawValue := lastRow[indexPrimaryKey] - - lastId, ok := ToInt64(lastIdRawValue) - if !ok { - currentBatch := *batch - currentBatch.RetryCounter = maxRetryAttempts - return ExtractorError{ - Batch: currentBatch, - HasLastId: true, - Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()), - } - - } - - return ExtractorError{ - Batch: *batch, - HasLastId: true, - LastId: lastId, - Msg: previousError.Error(), - } -} diff --git a/cmd/go_migrate/loader-error-handler.go b/cmd/go_migrate/loader-error-handler.go index bb40a4c..edd3569 100644 --- a/cmd/go_migrate/loader-error-handler.go +++ b/cmd/go_migrate/loader-error-handler.go @@ -20,6 +20,7 @@ func (e *LoaderError) Error() string { func loaderErrorHandler( ctx context.Context, + maxRetryAttempts int, chErrorsIn <-chan LoaderError, chChunksOut chan<- models.Chunk, chJobErrorsOut chan<- custom_errors.JobError, diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index d3ee4a5..c5c7513 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -67,8 +67,8 @@ func processMigrationJob( } }() - go custom_errors.ExtractorErrorHandler(jobCtx, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) - go loaderErrorHandler(jobCtx, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) + go custom_errors.ExtractorErrorHandler(jobCtx, job.Retry.Attempts, chExtractorErrors, chBatches, chJobErrors, &wgActiveBatches) + go loaderErrorHandler(jobCtx, job.Retry.Attempts, chLoadersErrors, chChunksTransformed, chJobErrors, &wgActiveChunks) maxExtractors := min(job.MaxExtractors, len(batches)) log.Infof("Starting %d extractor(s)...", maxExtractors) diff --git a/internal/app/custom_errors/extractor.error.go b/internal/app/custom_errors/extractor.error.go index 5aafa24..4ff4c4f 100644 --- a/internal/app/custom_errors/extractor.error.go +++ b/internal/app/custom_errors/extractor.error.go @@ -20,10 +20,9 @@ func (e *ExtractorError) Error() string { return e.Msg } -const maxRetryAttempts = 3 - func ExtractorErrorHandler( ctx context.Context, + maxRetryAttempts int, chErrorsIn <-chan ExtractorError, chBatchesOut chan<- models.Batch, chJobErrorsOut chan<- JobError,