111 lines
2.8 KiB
Go
111 lines
2.8 KiB
Go
package extractors
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
type PostgresExtractor struct {
|
|
db dbwrapper.DbWrapper
|
|
}
|
|
|
|
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
|
|
return &PostgresExtractor{db: db}
|
|
}
|
|
|
|
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
|
|
var sbColumns strings.Builder
|
|
|
|
if len(columns) == 0 {
|
|
sbColumns.WriteString("*")
|
|
} else {
|
|
for i, col := range columns {
|
|
if col.Type() == "GEOMETRY" {
|
|
sbColumns.WriteString(`ST_AsEWKB("`)
|
|
sbColumns.WriteString(col.Name())
|
|
sbColumns.WriteString(`") AS "`)
|
|
sbColumns.WriteString(col.Name())
|
|
sbColumns.WriteString(`"`)
|
|
} else {
|
|
sbColumns.WriteString(`"`)
|
|
sbColumns.WriteString(col.Name())
|
|
sbColumns.WriteString(`"`)
|
|
}
|
|
|
|
if i < len(columns)-1 {
|
|
sbColumns.WriteString(", ")
|
|
}
|
|
}
|
|
}
|
|
|
|
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
|
|
}
|
|
|
|
func (postgresEx *PostgresExtractor) Exec(
|
|
ctx context.Context,
|
|
tableInfo config.SourceTableInfo,
|
|
columns []models.ColumnType,
|
|
batchSize int,
|
|
partition models.Partition,
|
|
indexPrimaryKey int,
|
|
chBatchesOut chan<- models.Batch,
|
|
) (int64, error) {
|
|
query := buildExtractQueryPostgres(tableInfo, columns)
|
|
|
|
if partition.HasRange {
|
|
return 0, errors.New("Batch config not yet supported")
|
|
}
|
|
|
|
var rowsRead int64 = 0
|
|
rows, err := postgresEx.db.Query(ctx, query)
|
|
if err != nil {
|
|
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
|
}
|
|
defer rows.Close()
|
|
|
|
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
|
|
|
for rows.Next() {
|
|
values, err := rows.Values()
|
|
if err != nil {
|
|
return rowsRead, errors.New("Unexpected error reading rows from source")
|
|
}
|
|
rowsRead++
|
|
|
|
batchRows = append(batchRows, values)
|
|
|
|
if len(batchRows) >= batchSize {
|
|
select {
|
|
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
|
case <-ctx.Done():
|
|
return rowsRead, ctx.Err()
|
|
}
|
|
|
|
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
|
}
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return rowsRead, errors.New("Unexpected error reading rows from source")
|
|
}
|
|
|
|
if len(batchRows) > 0 {
|
|
select {
|
|
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
|
case <-ctx.Done():
|
|
return rowsRead, nil
|
|
}
|
|
}
|
|
|
|
return rowsRead, nil
|
|
}
|