Files

118 lines
2.5 KiB
Go

package loaders
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/jackc/pgx/v5/pgconn"
)
type GenericLoader struct {
db dbwrapper.DbWrapper
}
func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader {
return &GenericLoader{db: db}
}
func (gl *GenericLoader) ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int, error) {
_, err := gl.db.SaveMassive(
ctx,
tableInfo.Schema,
tableInfo.Table,
colNames,
batch.Rows,
)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == "23505" {
return 0, &custom_errors.JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table),
Prev: err,
}
}
}
return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()}
}
return len(batch.Rows), nil
}
func (gl *GenericLoader) Exec(
ctx context.Context,
tableInfo config.TargetTableInfo,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.LoaderError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64,
) {
colNames := mapSlice(columns, func(col models.ColumnType) string {
return col.Name()
})
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch)
if err != nil {
var ldError *custom_errors.LoaderError
var jobError *custom_errors.JobError
if errors.As(err, &ldError) {
select {
case <-ctx.Done():
return
case chErrorsOut <- *ldError:
}
} else if errors.As(err, &jobError) {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}:
}
}
continue
}
wgActiveBatches.Done()
atomic.AddInt64(rowsLoaded, int64(processedRows))
}
}
}