diff --git a/cmd/go_migrate/process.go b/cmd/go_migrate/process.go index cd1f85b..1e35c33 100644 --- a/cmd/go_migrate/process.go +++ b/cmd/go_migrate/process.go @@ -13,6 +13,7 @@ import ( dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/extractors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/loaders" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/table_analyzers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl/transformers" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" @@ -43,7 +44,7 @@ func processMigrationJob( targetTableAnalyzer etl.TableAnalyzer, extractor extractors.GenericExtractor, azureClient *azure.Client, - loader etl.Loader, + loader loaders.GenericLoader, job config.Job, targetDbType string, ) models.JobResult { @@ -189,12 +190,12 @@ func processMigrationJob( for range job.MaxLoaders { wgLoaders.Go(func() { - loader.Exec( + loader.Consume( localCtx, job.TargetTable, targetColTypes, + job.Retry, chBatchesTransformed, - chLoadersErrors, chJobErrors, &wgActiveBatches, &rowsLoaded, diff --git a/internal/app/etl/loaders/consume.go b/internal/app/etl/loaders/consume.go index 374a15b..d634125 100644 --- a/internal/app/etl/loaders/consume.go +++ b/internal/app/etl/loaders/consume.go @@ -38,7 +38,7 @@ func (gl *GenericLoader) Consume( return } - processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) + processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch) wgActiveBatches.Done() if err != nil { diff --git a/internal/app/etl/loaders/main.go b/internal/app/etl/loaders/main.go index 39d06aa..1b6d20e 100644 --- a/internal/app/etl/loaders/main.go +++ b/internal/app/etl/loaders/main.go @@ -1,116 +1,13 @@ package loaders import ( - "context" - "errors" - "fmt" - "sync" - "sync/atomic" - - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" - "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" - "github.com/jackc/pgx/v5/pgconn" ) type GenericLoader struct { db dbwrapper.DbWrapper } -func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader { - return &GenericLoader{db: db} -} - -func (gl *GenericLoader) ProcessBatch( - ctx context.Context, - tableInfo config.TargetTableInfo, - colNames []string, - batch models.Batch, -) (int, error) { - _, err := gl.db.SaveMassive( - ctx, - tableInfo.Schema, - tableInfo.Table, - colNames, - batch.Rows, - ) - - if err != nil { - if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok { - if pgErr.Code == "23505" { - return 0, &custom_errors.JobError{ - ShouldCancelJob: true, - Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table), - Prev: err, - } - } - } - - return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()} - } - - return len(batch.Rows), nil -} - -func (gl *GenericLoader) Exec( - ctx context.Context, - tableInfo config.TargetTableInfo, - columns []models.ColumnType, - chBatchesIn <-chan models.Batch, - chErrorsOut chan<- custom_errors.LoaderError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveBatches *sync.WaitGroup, - rowsLoaded *int64, -) { - colNames := mapSlice(columns, func(col models.ColumnType) string { - return col.Name() - }) - - for { - if ctx.Err() != nil { - return - } - - select { - case <-ctx.Done(): - return - case batch, ok := <-chBatchesIn: - if !ok { - return - } - - processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) - - if err != nil { - var ldError *custom_errors.LoaderError - var jobError *custom_errors.JobError - if errors.As(err, &ldError) { - select { - case <-ctx.Done(): - return - case chErrorsOut <- *ldError: - } - } else if errors.As(err, &jobError) { - select { - case <-ctx.Done(): - return - case chJobErrorsOut <- *jobError: - } - } else { - select { - case <-ctx.Done(): - return - case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}: - } - } - - continue - } - - wgActiveBatches.Done() - atomic.AddInt64(rowsLoaded, int64(processedRows)) - } - } +func NewGenericLoader(db dbwrapper.DbWrapper) GenericLoader { + return GenericLoader{db: db} } diff --git a/internal/app/etl/loaders/process-with-retries.go b/internal/app/etl/loaders/process-with-retries.go new file mode 100644 index 0000000..8c1179c --- /dev/null +++ b/internal/app/etl/loaders/process-with-retries.go @@ -0,0 +1,50 @@ +package loaders + +import ( + "context" + "errors" + "fmt" + "time" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" +) + +func (gl *GenericLoader) ProcessBatchWithRetries( + ctx context.Context, + tableInfo config.TargetTableInfo, + colNames []string, + retryConfig config.RetryConfig, + batch models.Batch, +) (int64, error) { + for { + rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) + if err == nil { + return rowsLoaded, nil + } + + if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok { + batch.RetryCounter++ + + if batch.RetryCounter >= retryConfig.Attempts { + return rowsLoaded, &custom_errors.JobError{ + ShouldCancelJob: false, + Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", btError.Batch.Id, btError.Batch.RetryCounter), + Prev: btError, + } + } + + delay := custom_errors.ComputeBackoffDelay( + batch.RetryCounter, + retryConfig.BaseDelayMs, + retryConfig.MaxDelayMs, + retryConfig.MaxJitterMs, + ) + time.Sleep(delay) + continue + } + + return rowsLoaded, err + } +} diff --git a/internal/app/etl/loaders/process.go b/internal/app/etl/loaders/process.go new file mode 100644 index 0000000..4e48988 --- /dev/null +++ b/internal/app/etl/loaders/process.go @@ -0,0 +1,43 @@ +package loaders + +import ( + "context" + "errors" + "fmt" + + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" + "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" + "github.com/jackc/pgx/v5/pgconn" +) + +func (gl *GenericLoader) ProcessBatch( + ctx context.Context, + tableInfo config.TargetTableInfo, + colNames []string, + batch models.Batch, +) (int64, error) { + _, err := gl.db.SaveMassive( + ctx, + tableInfo.Schema, + tableInfo.Table, + colNames, + batch.Rows, + ) + + if err != nil { + if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok { + if pgErr.Code == "23505" { + return 0, &custom_errors.JobError{ + ShouldCancelJob: true, + Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table), + Prev: err, + } + } + } + + return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()} + } + + return int64(len(batch.Rows)), nil +} diff --git a/internal/app/etl/types.go b/internal/app/etl/types.go index b2eb87b..cb1bc1a 100644 --- a/internal/app/etl/types.go +++ b/internal/app/etl/types.go @@ -51,18 +51,7 @@ type Loader interface { tableInfo config.TargetTableInfo, colNames []string, batch models.Batch, - ) (int, error) - - Exec( - ctx context.Context, - tableInfo config.TargetTableInfo, - columns []models.ColumnType, - chBatchesIn <-chan models.Batch, - chErrorsOut chan<- custom_errors.LoaderError, - chJobErrorsOut chan<- custom_errors.JobError, - wgActiveBatches *sync.WaitGroup, - rowsLoaded *int64, - ) + ) (int64, error) } type TableAnalyzer interface {