10 Commits

20 changed files with 538 additions and 608 deletions

View File

@@ -118,7 +118,7 @@ func processMigrationJobs(
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
extractor := extractors.NewMssqlExtractor(sourceDb)
extractor := extractors.NewExtractor(sourceDb)
loader := loaders.NewGenericLoader(targetDb)
var azureClient *azure.Client

View File

@@ -39,7 +39,7 @@ func processMigrationJob(
targetDbWrapper dbwrapper.DbWrapper,
sourceTableAnalyzer etl.TableAnalyzer,
targetTableAnalyzer etl.TableAnalyzer,
extractor etl.Extractor,
extractor extractors.GenericExtractor,
azureClient *azure.Client,
loader etl.Loader,
job config.Job,
@@ -110,12 +110,11 @@ func processMigrationJob(
log.Error("Unexpected error calculating batch ranges: ", err)
}
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
chPartitions := make(chan models.Partition, job.QueueSize)
chBatchesRaw := make(chan models.Batch, job.QueueSize)
chBatchesTransformed := make(chan models.Batch, job.QueueSize)
chJobErrors := make(chan custom_errors.JobError, job.ExtractorQueueSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
chBatchesTransformed := make(chan models.Batch, job.ExtractorQueueSize)
var wgActivePartitions sync.WaitGroup
var wgActiveBatches sync.WaitGroup
@@ -131,19 +130,10 @@ func processMigrationJob(
}
}()
go custom_errors.ExtractorErrorHandler(
localCtx,
job.Retry,
job.MaxPartitionErrrors,
chExtractorErrors,
chPartitions,
chJobErrors,
&wgActivePartitions,
)
go custom_errors.LoaderErrorHandler(
localCtx,
job.Retry,
job.MaxChunkErrors,
job.MaxExtractorBatchErrors,
chLoadersErrors,
chBatchesTransformed,
chJobErrors,
@@ -155,15 +145,14 @@ func processMigrationJob(
for range maxExtractors {
wgExtractors.Go(func() {
extractors.Consume(
extractor.Consume(
localCtx,
extractor,
job.SourceTable,
sourceColTypes,
job.BatchSize,
job.ExtractorBatchSize,
job.Retry,
chPartitions,
chBatchesRaw,
chExtractorErrors,
chJobErrors,
&wgActivePartitions,
&rowsRead,
@@ -217,8 +206,6 @@ func processMigrationJob(
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name)
close(chExtractorErrors)
log.Debugf("chExtractorErrors is closed (%v)", job.Name)
wgExtractors.Wait()
log.Debugf("wgExtractors is empty (%v)", job.Name)

View File

@@ -3,15 +3,19 @@ source_db_type: sqlserver
target_db_type: postgres
defaults:
max_extractors: 2
max_loaders: 4
queue_size: 8
batch_size: 25000
batches_per_partition: 8
max_extractors: 2
extractor_batch_size: 25000
extractor_queue_size: 8
max_transformers: 2
transformer_batch_size: 25000
transformer_queue_size: 8
max_loaders: 4
loader_batch_size: 25000
truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_chunk_errors: 5
max_extractor_batch_errors: 5
retry:
attempts: 3
base_delay_ms: 500

View File

@@ -25,15 +25,19 @@ type ToStorageConfig struct {
}
type JobConfig struct {
MaxExtractors int `yaml:"max_extractors"`
MaxLoaders int `yaml:"max_loaders"`
QueueSize int `yaml:"queue_size"`
BatchSize int `yaml:"batch_size"`
BatchesPerPartition int `yaml:"batches_per_partition"`
MaxExtractors int `yaml:"max_extractors"`
ExtractorBatchSize int `yaml:"extractor_batch_size"`
ExtractorQueueSize int `yaml:"extractor_queue_size"`
MaxTransformers int `yaml:"max_transformers"`
TransformerBatchSize int `yaml:"transformer_batch_size"`
TransformerQueueSize int `yaml:"transformer_queue_size"`
MaxLoaders int `yaml:"max_loaders"`
LoaderBatchSize int `yaml:"loader_batch_size"`
TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
MaxChunkErrors int `yaml:"max_chunk_errors"`
MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"`
Retry RetryConfig `yaml:"retry"`
RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
@@ -97,7 +101,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
c.Defaults = raw.Defaults
c.SourceDbType = raw.SourceDbType
c.TargetDbType = raw.TargetDbType
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
c.Defaults.RowsPerPartition = int64(raw.Defaults.ExtractorBatchSize * raw.Defaults.BatchesPerPartition)
for _, node := range raw.Jobs {
job := Job{
@@ -108,7 +112,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
return err
}
job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition)
job.RowsPerPartition = int64(job.ExtractorBatchSize * job.BatchesPerPartition)
c.Jobs = append(c.Jobs, job)
}

View File

@@ -6,7 +6,7 @@ import (
"time"
)
func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
if retryCounter < 0 {
retryCounter = 0
}

View File

@@ -1,13 +1,7 @@
package custom_errors
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type ExtractorError struct {
@@ -20,100 +14,3 @@ type ExtractorError struct {
func (e *ExtractorError) Error() string {
return e.Msg
}
func ExtractorErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxPartitionErrors int,
chErrorsIn <-chan ExtractorError,
chPartitionsOut chan<- models.Partition,
chJobErrorsOut chan<- JobError,
wgActivePartitions *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Partition.RetryCounter >= retryConfig.Attempts {
wgActivePartitions.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
newPartition := err.Partition
newPartition.RetryCounter++
delay := computeBackoffDelay(
newPartition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
if err.HasLastId {
newPartition.ParentId = err.Partition.Id
newPartition.Id = uuid.New()
newPartition.Range.Min = err.LastId
newPartition.Range.IsMinInclusive = false
}
requeueWithBackoff(ctx, delay, func() {
select {
case chPartitionsOut <- newPartition:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -88,7 +88,7 @@ func LoaderErrorHandler(
}
err.Batch.RetryCounter++
delay := computeBackoffDelay(
delay := ComputeBackoffDelay(
err.Batch.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,

View File

@@ -0,0 +1,7 @@
package db_dialects
const (
SqlServer string = "sqlserver"
Postgres string = "postgres"
Null string = "null"
)

View File

@@ -4,13 +4,15 @@ import (
"context"
"database/sql"
"fmt"
"strings"
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
mssql "github.com/microsoft/go-mssqldb"
)
func init() {
Register("sqlserver", func() DbWrapper {
return &mssqlDbWrapper{dialect: "sqlserver"}
Register(dbdialects.SqlServer, func() DbWrapper {
return &mssqlDbWrapper{dialect: dbdialects.SqlServer}
})
}
@@ -174,3 +176,74 @@ func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table
return rowsAffected, nil
}
func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(q.Columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range q.Columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
switch col.Type() {
case "GEOMETRY":
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
}
if i < len(q.Columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", q.Schema, q.Table)
if q.LowerLimit.IsValid || q.UpperLimit.IsValid {
sbQuery.WriteString(" WHERE ")
if q.LowerLimit.IsValid {
fmt.Fprintf(&sbQuery, "[%s]", q.PrimaryKey)
if q.LowerLimit.IsInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
sbQuery.WriteString(" @min")
}
if q.LowerLimit.IsValid && q.UpperLimit.IsValid {
sbQuery.WriteString(" AND ")
}
if q.UpperLimit.IsValid {
fmt.Fprintf(&sbQuery, "[%s]", q.PrimaryKey)
if q.UpperLimit.IsInclusive {
sbQuery.WriteString(" <=")
} else {
sbQuery.WriteString(" <")
}
sbQuery.WriteString(" @max")
}
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", q.PrimaryKey)
queryString := sbQuery.String()
// logrus.Debugf("Query: %s", queryString)
var queryArgs []any
if q.LowerLimit.IsValid {
queryArgs = append(queryArgs, sql.Named("min", q.LowerLimit.Value))
}
if q.UpperLimit.IsValid {
queryArgs = append(queryArgs, sql.Named("max", q.UpperLimit.Value))
}
return mw.Query(ctx, queryString, queryArgs...)
}

View File

@@ -3,14 +3,17 @@ package dbwrapper
import (
"context"
"errors"
"fmt"
"strings"
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func init() {
Register("postgres", func() DbWrapper {
return &postgresDbWrapper{dialect: "postgres"}
Register(dbdialects.Postgres, func() DbWrapper {
return &postgresDbWrapper{dialect: dbdialects.Postgres}
})
}
@@ -126,3 +129,75 @@ func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, tab
return affectedRows, nil
}
func (pw *postgresDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(q.Columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range q.Columns {
switch col.Type() {
case "GEOMETRY":
fmt.Fprintf(&sbQuery, `ST_AsEWKB("%s") AS "%s"`, col.Name(), col.Name())
default:
fmt.Fprintf(&sbQuery, `"%s"`, col.Name())
}
if i < len(q.Columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, ` FROM "%s"."%s"`, q.Schema, q.Table)
if q.LowerLimit.IsValid || q.UpperLimit.IsValid {
sbQuery.WriteString(" WHERE ")
paramIdx := 1
if q.LowerLimit.IsValid {
fmt.Fprintf(&sbQuery, `"%s"`, q.PrimaryKey)
if q.LowerLimit.IsInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
fmt.Fprintf(&sbQuery, " $%d", paramIdx)
paramIdx++
}
if q.LowerLimit.IsValid && q.UpperLimit.IsValid {
sbQuery.WriteString(" AND ")
}
if q.UpperLimit.IsValid {
fmt.Fprintf(&sbQuery, `"%s"`, q.PrimaryKey)
if q.UpperLimit.IsInclusive {
sbQuery.WriteString(" <=")
} else {
sbQuery.WriteString(" <")
}
fmt.Fprintf(&sbQuery, " $%d", paramIdx)
paramIdx++
}
}
fmt.Fprintf(&sbQuery, ` ORDER BY "%s" ASC`, q.PrimaryKey)
queryString := sbQuery.String()
var queryArgs []any
if q.LowerLimit.IsValid {
queryArgs = append(queryArgs, q.LowerLimit.Value)
}
if q.UpperLimit.IsValid {
queryArgs = append(queryArgs, q.UpperLimit.Value)
}
return pw.Query(ctx, queryString, queryArgs...)
}

View File

@@ -3,6 +3,8 @@ package dbwrapper
import (
"context"
"errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
@@ -24,6 +26,21 @@ type RowResult interface {
Scan(dest ...any) error
}
type ExtractorQueryLimit struct {
IsValid bool
IsInclusive bool
Value int64
}
type ExtractionQuery struct {
Schema string
Table string
PrimaryKey string
Columns []models.ColumnType
LowerLimit ExtractorQueryLimit
UpperLimit ExtractorQueryLimit
}
type DbWrapper interface {
Close() error
Connect(ctx context.Context, dbUrl string) error
@@ -32,4 +49,5 @@ type DbWrapper interface {
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
QueryRow(ctx context.Context, query string, args ...any) RowResult
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
QueryFromObject(ctx context.Context, query ExtractionQuery) (RowsResult, error)
}

View File

@@ -0,0 +1,94 @@
package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/sirupsen/logrus"
)
func (ex *GenericExtractor) Consume(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
retryConfig config.RetryConfig,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
if indexPrimaryKey == -1 {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := ex.ProcessPartitionWithRetries(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
retryConfig,
chBatchesOut,
)
wgActivePartitions.Done()
if rowsReadResult > 0 {
current := atomic.LoadInt64(rowsRead)
logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
}
}
}
}

View File

@@ -2,100 +2,41 @@ package extractors
import (
"context"
"errors"
"slices"
"strings"
"sync"
"sync/atomic"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func Consume(
ctx context.Context,
extractor etl.Extractor,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
chPartitionsIn <-chan models.Partition,
chBatchesOut chan<- models.Batch,
chErrorsOut chan<- custom_errors.ExtractorError,
chJobErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup,
rowsRead *int64,
) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
})
type GenericExtractor struct {
db dbwrapper.DbWrapper
}
if indexPrimaryKey == -1 {
func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor {
return GenericExtractor{db: db}
}
func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error {
select {
case chBatchesOut <- batch:
return nil
case <-ctx.Done():
return
case chJobErrorsOut <- custom_errors.JobError{
ShouldCancelJob: true,
Msg: "Primary key not found in provided columns",
}:
}
return
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case partition, ok := <-chPartitionsIn:
if !ok {
return
}
rowsReadResult, err := extractor.Exec(
ctx,
tableInfo,
columns,
batchSize,
partition,
indexPrimaryKey,
chBatchesOut,
)
if rowsReadResult > 0 {
atomic.AddInt64(rowsRead, int64(rowsReadResult))
}
if err != nil {
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
select {
case <-ctx.Done():
return
case chErrorsOut <- *exError:
}
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return
case chJobErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
}
}
continue
}
wgActivePartitions.Done()
}
return ctx.Err()
}
}
func flush(
ctx context.Context,
partition *models.Partition,
batchSize int,
batchRows []models.UnknownRowValues,
chBatchesOut chan<- models.Batch,
) error {
if len(batchRows) == 0 {
return nil
}
batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
return sendBatch(ctx, chBatchesOut, batch)
}

View File

@@ -1,209 +0,0 @@
package extractors
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type MssqlExtractor struct {
db dbwrapper.DbWrapper
}
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &MssqlExtractor{db: db}
}
func buildExtractQueryMssql(
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
includeRange bool,
isMinInclusive bool,
isMaxInclusive bool,
hasMin bool,
hasMax bool,
) string {
var sbQuery strings.Builder
sbQuery.WriteString("SELECT ")
if len(columns) == 0 {
sbQuery.WriteString("*")
} else {
for i, col := range columns {
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
if col.Type() == "GEOMETRY" {
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
}
if i < len(columns)-1 {
sbQuery.WriteString(", ")
}
}
}
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table)
if includeRange && (hasMin || hasMax) {
sbQuery.WriteString(" WHERE ")
if hasMin {
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
if isMinInclusive {
sbQuery.WriteString(" >=")
} else {
sbQuery.WriteString(" >")
}
sbQuery.WriteString(" @min")
}
if hasMin && hasMax {
sbQuery.WriteString(" AND ")
}
if hasMax {
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
if isMaxInclusive {
sbQuery.WriteString(" <=")
} else {
sbQuery.WriteString(" <")
}
sbQuery.WriteString(" @max")
}
}
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey)
return sbQuery.String()
}
func errorFromLastRow(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition models.Partition,
previousError error,
) *custom_errors.ExtractorError {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok {
currentPartition := partition
currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{
Partition: currentPartition,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return &custom_errors.ExtractorError{
Partition: partition,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}
func (mssqlEx *MssqlExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
hasMin := partition.HasRange && partition.Range.Min > 0
hasMax := partition.HasRange && partition.Range.Max > 0
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
var queryArgs []any
if hasMin {
queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min))
}
if hasMax {
queryArgs = append(queryArgs, sql.Named("max", partition.Range.Max))
}
rowsRead := 0
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() {
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(batchRows) == 0 {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
lastRow := batchRows[len(batchRows)-1]
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
}
rowsRead++
batchRows = append(batchRows, rowValues)
if len(batchRows) >= batchSize {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := rows.Err(); err != nil {
if errors.Is(err, ctx.Err()) {
return rowsRead, ctx.Err()
}
if len(batchRows) > 0 {
lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
}
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
if len(batchRows) > 0 {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
}
return rowsRead, nil
}

View File

@@ -1,157 +0,0 @@
package extractors
import (
"context"
"fmt"
"strings"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
type PostgresExtractor struct {
db dbwrapper.DbWrapper
}
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
return &PostgresExtractor{db: db}
}
func buildExtractQueryPostgres(
sourceDbInfo config.SourceTableInfo,
columns []models.ColumnType,
includeRange bool,
isMinInclusive bool,
isMaxInclusive bool,
hasMin bool,
hasMax bool,
) string {
var sbColumns strings.Builder
if len(columns) == 0 {
sbColumns.WriteString("*")
} else {
for i, col := range columns {
if col.Type() == "GEOMETRY" {
sbColumns.WriteString(`ST_AsEWKB("`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`") AS "`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`"`)
} else {
sbColumns.WriteString(`"`)
sbColumns.WriteString(col.Name())
sbColumns.WriteString(`"`)
}
if i < len(columns)-1 {
sbColumns.WriteString(", ")
}
}
}
query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table)
if includeRange && (hasMin || hasMax) {
query += " WHERE "
paramIdx := 1
if hasMin {
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
if isMinInclusive {
query += " >="
} else {
query += " >"
}
query += fmt.Sprintf(" $%d", paramIdx)
paramIdx++
}
if hasMin && hasMax {
query += " AND "
}
if hasMax {
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
if isMaxInclusive {
query += " <="
} else {
query += " <"
}
query += fmt.Sprintf(" $%d", paramIdx)
}
}
query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey)
return query
}
func (postgresEx *PostgresExtractor) Exec(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error) {
hasMin := partition.HasRange && partition.Range.Min > 0
hasMax := partition.HasRange && partition.Range.Max > 0
query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
var queryArgs []any
if hasMin {
queryArgs = append(queryArgs, partition.Range.Min)
}
if hasMax {
queryArgs = append(queryArgs, partition.Range.Max)
}
rowsRead := 0
rows, err := postgresEx.db.Query(ctx, query, queryArgs...)
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
for rows.Next() {
values, err := rows.Values()
if err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
rowsRead++
batchRows = append(batchRows, values)
if len(batchRows) >= batchSize {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, ctx.Err()
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := rows.Err(); err != nil {
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
}
if len(batchRows) > 0 {
select {
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
case <-ctx.Done():
return rowsRead, nil
}
}
return rowsRead, nil
}

View File

@@ -0,0 +1,75 @@
package extractors
import (
"context"
"errors"
"fmt"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
// "github.com/sirupsen/logrus"
)
func (ex *GenericExtractor) ProcessPartitionWithRetries(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
retryConfig config.RetryConfig,
chBatchesOut chan<- models.Batch,
) (int64, error) {
var totalRowsRead int64
currentParitition := partition
for {
rowsRead, err := ex.ProcessPartition(
ctx,
tableInfo,
columns,
batchSize,
currentParitition,
indexPrimaryKey,
chBatchesOut,
)
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
totalRowsRead += rowsRead
if err == nil {
return totalRowsRead, nil
}
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
currentParitition.RetryCounter++
if currentParitition.RetryCounter >= retryConfig.Attempts {
return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
Prev: err,
}
}
if exError.HasLastId {
currentParitition.ParentId = exError.Partition.Id
currentParitition.Id = uuid.New()
currentParitition.Range.Min = exError.LastId
currentParitition.Range.IsMinInclusive = false
}
delay := custom_errors.ComputeBackoffDelay(
currentParitition.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
time.Sleep(delay)
continue
}
return totalRowsRead, err
}
}

View File

@@ -0,0 +1,116 @@
package extractors
import (
"context"
"fmt"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
// "github.com/sirupsen/logrus"
)
func errorFromLastPartitionRow(
lastRow models.UnknownRowValues,
indexPrimaryKey int,
partition models.Partition,
previousError error,
) error {
lastIdRawValue := lastRow[indexPrimaryKey]
lastId, ok := convert.ToInt64(lastIdRawValue)
if !ok {
currentPartition := partition
currentPartition.RetryCounter = 3
return &custom_errors.ExtractorError{
Partition: currentPartition,
HasLastId: true,
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
}
}
return &custom_errors.ExtractorError{
Partition: partition,
HasLastId: true,
LastId: lastId,
Msg: previousError.Error(),
}
}
func (ex *GenericExtractor) ProcessPartition(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int64, error) {
query := dbwrapper.ExtractionQuery{
Schema: tableInfo.Schema,
Table: tableInfo.Table,
PrimaryKey: tableInfo.PrimaryKey,
Columns: columns,
LowerLimit: dbwrapper.ExtractorQueryLimit{
IsValid: partition.HasRange && partition.Range.Min > 0,
IsInclusive: partition.Range.IsMinInclusive,
Value: partition.Range.Min,
},
UpperLimit: dbwrapper.ExtractorQueryLimit{
IsValid: partition.HasRange && partition.Range.Max > 0,
IsInclusive: partition.Range.IsMaxInclusive,
Value: partition.Range.Max,
},
}
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
rows, err := ex.db.QueryFromObject(ctx, query)
if err != nil {
return 0, err
}
defer rows.Close()
batchRows := make([]models.UnknownRowValues, 0, batchSize)
var rowsRead int64 = 0
for rows.Next() {
rowValues := make([]any, len(columns))
scanArgs := make([]any, len(columns))
for i := range rowValues {
scanArgs[i] = &rowValues[i]
}
if err := rows.Scan(scanArgs...); err != nil {
if len(batchRows) == 0 {
return rowsRead, err
}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
lastRow := batchRows[len(batchRows)-1]
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
}
rowsRead++
batchRows = append(batchRows, rowValues)
if len(batchRows) >= batchSize {
// logrus.Debugf("Batch size reached, flushing batch with %v rows (rowsRead=%v)", len(batchRows), rowsRead)
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
// logrus.Warnf("Error flushing rows: %v", err)
return rowsRead, err
}
batchRows = make([]models.UnknownRowValues, 0, batchSize)
}
}
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
return rowsRead, err
}
return rowsRead, rows.Err()
}

View File

@@ -7,6 +7,7 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
func PartitionRangeGenerator(
@@ -32,6 +33,7 @@ func PartitionRangeGenerator(
}
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table)
if err != nil {
return nil, err
}
@@ -51,5 +53,7 @@ func PartitionRangeGenerator(
return nil, err
}
// logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table)
return partitions, nil
}

View File

@@ -234,6 +234,7 @@ ORDER BY batch_id`,
RetryCounter: 0,
Range: models.PartitionRange{
IsMinInclusive: true,
IsMaxInclusive: true,
},
}

View File

@@ -10,7 +10,7 @@ import (
)
type Extractor interface {
Exec(
ProcessPartition(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,