package extractors import ( "context" "errors" "fmt" "strings" "sync" "sync/atomic" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" ) type PostgresExtractor struct { db *pgxpool.Pool } func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor { return &PostgresExtractor{db: pool} } func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string { var sbColumns strings.Builder if len(columns) == 0 { sbColumns.WriteString("*") } else { for i, col := range columns { if col.Type() == "GEOMETRY" { sbColumns.WriteString(`ST_AsEWKB("`) sbColumns.WriteString(col.Name()) sbColumns.WriteString(`") AS "`) sbColumns.WriteString(col.Name()) sbColumns.WriteString(`"`) } else { sbColumns.WriteString(`"`) sbColumns.WriteString(col.Name()) sbColumns.WriteString(`"`) } if i < len(columns)-1 { sbColumns.WriteString(", ") } } } return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey) } func (postgresEx *PostgresExtractor) ProcessPartition( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, partition models.Partition, indexPrimaryKey int, chBatchesOut chan<- models.Batch, rowsRead *int64, ) error { query := buildExtractQueryPostgres(tableInfo, columns) if partition.ShouldUseRange { return errors.New("Batch config not yet supported") } rows, err := postgresEx.db.Query(ctx, query) if err != nil { return &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()} } defer rows.Close() batchRows := make([]models.UnknownRowValues, 0, batchSize) for rows.Next() { values, err := rows.Values() if err != nil { return errors.New("Unexpected error reading rows from source") } batchRows = append(batchRows, values) if len(batchRows) >= batchSize { select { case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): return nil } atomic.AddInt64(rowsRead, int64(len(batchRows))) batchRows = make([]models.UnknownRowValues, 0, batchSize) } } if err := rows.Err(); err != nil { return errors.New("Unexpected error reading rows from source") } if len(batchRows) > 0 { select { case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}: case <-ctx.Done(): return nil } atomic.AddInt64(rowsRead, int64(len(batchRows))) } return nil } func (postgresEx *PostgresExtractor) Exec( ctx context.Context, tableInfo config.SourceTableInfo, columns []models.ColumnType, batchSize int, chPartitionsIn <-chan models.Partition, chBatchesOut chan<- models.Batch, chErrorsOut chan<- custom_errors.ExtractorError, chJobErrorsOut chan<- custom_errors.JobError, wgActivePartitions *sync.WaitGroup, rowsRead *int64, ) { }