refactor: update GenericLoader to use ProcessBatchWithRetries; enhance error handling and retry logic
This commit is contained in:
@@ -38,7 +38,7 @@ func (gl *GenericLoader) Consume(
|
||||
return
|
||||
}
|
||||
|
||||
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
|
||||
wgActiveBatches.Done()
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -1,116 +1,13 @@
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
type GenericLoader struct {
|
||||
db dbwrapper.DbWrapper
|
||||
}
|
||||
|
||||
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
|
||||
return &GenericLoader{db: db}
|
||||
}
|
||||
|
||||
func (gl *GenericLoader) ProcessBatch(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
batch models.Batch,
|
||||
) (int, error) {
|
||||
_, err := gl.db.SaveMassive(
|
||||
ctx,
|
||||
tableInfo.Schema,
|
||||
tableInfo.Table,
|
||||
colNames,
|
||||
batch.Rows,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok {
|
||||
if pgErr.Code == "23505" {
|
||||
return 0, &custom_errors.JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
|
||||
Prev: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
|
||||
}
|
||||
|
||||
return len(batch.Rows), nil
|
||||
}
|
||||
|
||||
func (gl *GenericLoader) Exec(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
columns []models.ColumnType,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chErrorsOut chan<- custom_errors.LoaderError,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
rowsLoaded *int64,
|
||||
) {
|
||||
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
||||
return col.Name()
|
||||
})
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
|
||||
if err != nil {
|
||||
var ldError *custom_errors.LoaderError
|
||||
var jobError *custom_errors.JobError
|
||||
if errors.As(err, &ldError) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- *ldError:
|
||||
}
|
||||
} else if errors.As(err, &jobError) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}:
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
wgActiveBatches.Done()
|
||||
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||
}
|
||||
}
|
||||
func NewGenericLoader(db dbwrapper.DbWrapper) GenericLoader {
|
||||
return GenericLoader{db: db}
|
||||
}
|
||||
|
||||
50
internal/app/etl/loaders/process-with-retries.go
Normal file
50
internal/app/etl/loaders/process-with-retries.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
func (gl *GenericLoader) ProcessBatchWithRetries(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
retryConfig config.RetryConfig,
|
||||
batch models.Batch,
|
||||
) (int64, error) {
|
||||
for {
|
||||
rowsLoaded, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
|
||||
if err == nil {
|
||||
return rowsLoaded, nil
|
||||
}
|
||||
|
||||
if btError, ok := errors.AsType[*custom_errors.LoaderError](err); ok {
|
||||
batch.RetryCounter++
|
||||
|
||||
if batch.RetryCounter >= retryConfig.Attempts {
|
||||
return rowsLoaded, &custom_errors.JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", btError.Batch.Id, btError.Batch.RetryCounter),
|
||||
Prev: btError,
|
||||
}
|
||||
}
|
||||
|
||||
delay := custom_errors.ComputeBackoffDelay(
|
||||
batch.RetryCounter,
|
||||
retryConfig.BaseDelayMs,
|
||||
retryConfig.MaxDelayMs,
|
||||
retryConfig.MaxJitterMs,
|
||||
)
|
||||
time.Sleep(delay)
|
||||
continue
|
||||
}
|
||||
|
||||
return rowsLoaded, err
|
||||
}
|
||||
}
|
||||
43
internal/app/etl/loaders/process.go
Normal file
43
internal/app/etl/loaders/process.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
func (gl *GenericLoader) ProcessBatch(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
batch models.Batch,
|
||||
) (int64, error) {
|
||||
_, err := gl.db.SaveMassive(
|
||||
ctx,
|
||||
tableInfo.Schema,
|
||||
tableInfo.Table,
|
||||
colNames,
|
||||
batch.Rows,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if pgErr, ok := errors.AsType[*pgconn.PgError](err); ok {
|
||||
if pgErr.Code == "23505" {
|
||||
return 0, &custom_errors.JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
|
||||
Prev: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
|
||||
}
|
||||
|
||||
return int64(len(batch.Rows)), nil
|
||||
}
|
||||
Reference in New Issue
Block a user