package loaders import ( "context" "errors" "fmt" "sync" "sync/atomic" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/jackc/pgx/v5/pgconn" ) type GenericLoader struct { db dbwrapper.DbWrapper } func NewGenericLoader(db dbwrapper.DbWrapper) etl.Loader { return &GenericLoader{db: db} } func (gl *GenericLoader) ProcessBatch( ctx context.Context, tableInfo config.TargetTableInfo, colNames []string, batch models.Batch, ) (int, error) { _, err := gl.db.SaveMassive( ctx, tableInfo.Schema, tableInfo.Table, colNames, batch.Rows, ) if err != nil { var pgErr *pgconn.PgError if errors.As(err, &pgErr) { if pgErr.Code == "23505" { return 0, &custom_errors.JobError{ ShouldCancelJob: true, Msg: fmt.Sprintf("Fatal error in table %s.%s", tableInfo.Schema, tableInfo.Table), Prev: err, } } } return 0, &custom_errors.LoaderError{Batch: batch, Msg: err.Error()} } return len(batch.Rows), nil } func (gl *GenericLoader) Exec( ctx context.Context, tableInfo config.TargetTableInfo, columns []models.ColumnType, chBatchesIn <-chan models.Batch, chErrorsOut chan<- custom_errors.LoaderError, chJobErrorsOut chan<- custom_errors.JobError, wgActiveBatches *sync.WaitGroup, rowsLoaded *int64, ) { colNames := mapSlice(columns, func(col models.ColumnType) string { return col.Name() }) for { if ctx.Err() != nil { return } select { case <-ctx.Done(): return case batch, ok := <-chBatchesIn: if !ok { return } processedRows, err := gl.ProcessBatch(ctx, tableInfo, colNames, batch) if err != nil { var ldError *custom_errors.LoaderError var jobError *custom_errors.JobError if errors.As(err, &ldError) { select { case <-ctx.Done(): return case chErrorsOut <- *ldError: } } else if errors.As(err, &jobError) { select { case <-ctx.Done(): return case chJobErrorsOut <- *jobError: } } else { select { case <-ctx.Done(): return case chErrorsOut <- custom_errors.LoaderError{Batch: batch, Msg: err.Error()}: } } continue } wgActiveBatches.Done() atomic.AddInt64(rowsLoaded, int64(processedRows)) } } }