128 lines
3.1 KiB
Go
128 lines
3.1 KiB
Go
package extractor
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type PostgresExtractor struct {
|
|
db *pgxpool.Pool
|
|
}
|
|
|
|
func NewPostgresExtractor(pool *pgxpool.Pool) Extractor {
|
|
return &PostgresExtractor{db: pool}
|
|
}
|
|
|
|
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
|
|
var sbColumns strings.Builder
|
|
|
|
if len(columns) == 0 {
|
|
sbColumns.WriteString("*")
|
|
} else {
|
|
for i, col := range columns {
|
|
if col.Type() == "GEOMETRY" {
|
|
sbColumns.WriteString(`ST_AsEWKB("`)
|
|
sbColumns.WriteString(col.Name())
|
|
sbColumns.WriteString(`") AS "`)
|
|
sbColumns.WriteString(col.Name())
|
|
sbColumns.WriteString(`"`)
|
|
} else {
|
|
sbColumns.WriteString(`"`)
|
|
sbColumns.WriteString(col.Name())
|
|
sbColumns.WriteString(`"`)
|
|
}
|
|
|
|
if i < len(columns)-1 {
|
|
sbColumns.WriteString(", ")
|
|
}
|
|
}
|
|
}
|
|
|
|
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
|
|
}
|
|
|
|
func (postgresEx *PostgresExtractor) ProcessBatch(
|
|
ctx context.Context,
|
|
tableInfo config.SourceTableInfo,
|
|
columns []models.ColumnType,
|
|
chunkSize int,
|
|
batch models.Batch,
|
|
indexPrimaryKey int,
|
|
chChunksOut chan<- models.Chunk,
|
|
rowsRead *int64,
|
|
) error {
|
|
query := buildExtractQueryPostgres(tableInfo, columns)
|
|
|
|
if batch.ShouldUseRange {
|
|
return errors.New("Batch config not yet supported")
|
|
}
|
|
|
|
rows, err := postgresEx.db.Query(ctx, query)
|
|
if err != nil {
|
|
return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}
|
|
}
|
|
defer rows.Close()
|
|
|
|
rowsChunk := make([]models.UnknownRowValues, 0, chunkSize)
|
|
|
|
for rows.Next() {
|
|
values, err := rows.Values()
|
|
if err != nil {
|
|
return errors.New("Unexpected error reading rows from source")
|
|
}
|
|
|
|
rowsChunk = append(rowsChunk, values)
|
|
|
|
if len(rowsChunk) >= chunkSize {
|
|
select {
|
|
case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
|
|
atomic.AddInt64(rowsRead, int64(len(rowsChunk)))
|
|
rowsChunk = make([]models.UnknownRowValues, 0, chunkSize)
|
|
}
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return errors.New("Unexpected error reading rows from source")
|
|
}
|
|
|
|
if len(rowsChunk) > 0 {
|
|
select {
|
|
case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
|
|
atomic.AddInt64(rowsRead, int64(len(rowsChunk)))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (postgresEx *PostgresExtractor) Exec(
|
|
ctx context.Context,
|
|
tableInfo config.SourceTableInfo,
|
|
columns []models.ColumnType,
|
|
chunkSize int,
|
|
chBatchesIn <-chan models.Batch,
|
|
chChunksOut chan<- models.Chunk,
|
|
chErrorsOut chan<- custom_errors.ExtractorError,
|
|
chJobErrorsOut chan<- custom_errors.JobError,
|
|
wgActiveBatches *sync.WaitGroup,
|
|
rowsRead *int64,
|
|
) {
|
|
}
|