feat: implement MSSQL extractor, transformer, and Postgres loader for enhanced data migration
This commit is contained in:
128
internal/app/etl/extractors/postgres.go
Normal file
128
internal/app/etl/extractors/postgres.go
Normal file
@@ -0,0 +1,128 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type PostgresExtractor struct {
|
||||
db *pgxpool.Pool
|
||||
}
|
||||
|
||||
func NewPostgresExtractor(pool *pgxpool.Pool) etl.Extractor {
|
||||
return &PostgresExtractor{db: pool}
|
||||
}
|
||||
|
||||
func buildExtractQueryPostgres(sourceDbInfo config.SourceTableInfo, columns []models.ColumnType) string {
|
||||
var sbColumns strings.Builder
|
||||
|
||||
if len(columns) == 0 {
|
||||
sbColumns.WriteString("*")
|
||||
} else {
|
||||
for i, col := range columns {
|
||||
if col.Type() == "GEOMETRY" {
|
||||
sbColumns.WriteString(`ST_AsEWKB("`)
|
||||
sbColumns.WriteString(col.Name())
|
||||
sbColumns.WriteString(`") AS "`)
|
||||
sbColumns.WriteString(col.Name())
|
||||
sbColumns.WriteString(`"`)
|
||||
} else {
|
||||
sbColumns.WriteString(`"`)
|
||||
sbColumns.WriteString(col.Name())
|
||||
sbColumns.WriteString(`"`)
|
||||
}
|
||||
|
||||
if i < len(columns)-1 {
|
||||
sbColumns.WriteString(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf(`SELECT %s FROM "%s"."%s" ORDER BY "%s" ASC`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table, sourceDbInfo.PrimaryKey)
|
||||
}
|
||||
|
||||
func (postgresEx *PostgresExtractor) ProcessBatch(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
chunkSize int,
|
||||
batch models.Batch,
|
||||
indexPrimaryKey int,
|
||||
chChunksOut chan<- models.Chunk,
|
||||
rowsRead *int64,
|
||||
) error {
|
||||
query := buildExtractQueryPostgres(tableInfo, columns)
|
||||
|
||||
if batch.ShouldUseRange {
|
||||
return errors.New("Batch config not yet supported")
|
||||
}
|
||||
|
||||
rows, err := postgresEx.db.Query(ctx, query)
|
||||
if err != nil {
|
||||
return &custom_errors.ExtractorError{Batch: batch, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
rowsChunk := make([]models.UnknownRowValues, 0, chunkSize)
|
||||
|
||||
for rows.Next() {
|
||||
values, err := rows.Values()
|
||||
if err != nil {
|
||||
return errors.New("Unexpected error reading rows from source")
|
||||
}
|
||||
|
||||
rowsChunk = append(rowsChunk, values)
|
||||
|
||||
if len(rowsChunk) >= chunkSize {
|
||||
select {
|
||||
case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
|
||||
atomic.AddInt64(rowsRead, int64(len(rowsChunk)))
|
||||
rowsChunk = make([]models.UnknownRowValues, 0, chunkSize)
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return errors.New("Unexpected error reading rows from source")
|
||||
}
|
||||
|
||||
if len(rowsChunk) > 0 {
|
||||
select {
|
||||
case chChunksOut <- models.Chunk{Id: uuid.New(), BatchId: batch.Id, Data: rowsChunk, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
|
||||
atomic.AddInt64(rowsRead, int64(len(rowsChunk)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (postgresEx *PostgresExtractor) Exec(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
chunkSize int,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chChunksOut chan<- models.Chunk,
|
||||
chErrorsOut chan<- custom_errors.ExtractorError,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
rowsRead *int64,
|
||||
) {
|
||||
}
|
||||
Reference in New Issue
Block a user