Compare commits
10 Commits
6ad25e5889
...
2a5f703f3c
| Author | SHA1 | Date | |
|---|---|---|---|
|
2a5f703f3c
|
|||
|
7cb959a103
|
|||
|
6414943cf3
|
|||
|
00459e42e6
|
|||
|
52fe083ab7
|
|||
|
9a00d6af04
|
|||
|
33af391986
|
|||
|
2b2d740d2e
|
|||
|
fbe17b3842
|
|||
|
7bde77dcc5
|
@@ -118,7 +118,7 @@ func processMigrationJobs(
|
||||
|
||||
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
|
||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
||||
extractor := extractors.NewMssqlExtractor(sourceDb)
|
||||
extractor := extractors.NewExtractor(sourceDb)
|
||||
loader := loaders.NewGenericLoader(targetDb)
|
||||
|
||||
var azureClient *azure.Client
|
||||
|
||||
@@ -39,7 +39,7 @@ func processMigrationJob(
|
||||
targetDbWrapper dbwrapper.DbWrapper,
|
||||
sourceTableAnalyzer etl.TableAnalyzer,
|
||||
targetTableAnalyzer etl.TableAnalyzer,
|
||||
extractor etl.Extractor,
|
||||
extractor extractors.GenericExtractor,
|
||||
azureClient *azure.Client,
|
||||
loader etl.Loader,
|
||||
job config.Job,
|
||||
@@ -110,12 +110,11 @@ func processMigrationJob(
|
||||
log.Error("Unexpected error calculating batch ranges: ", err)
|
||||
}
|
||||
|
||||
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
|
||||
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
|
||||
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
|
||||
chPartitions := make(chan models.Partition, job.QueueSize)
|
||||
chBatchesRaw := make(chan models.Batch, job.QueueSize)
|
||||
chBatchesTransformed := make(chan models.Batch, job.QueueSize)
|
||||
chJobErrors := make(chan custom_errors.JobError, job.ExtractorQueueSize)
|
||||
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
|
||||
chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
|
||||
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
|
||||
chBatchesTransformed := make(chan models.Batch, job.ExtractorQueueSize)
|
||||
|
||||
var wgActivePartitions sync.WaitGroup
|
||||
var wgActiveBatches sync.WaitGroup
|
||||
@@ -131,19 +130,10 @@ func processMigrationJob(
|
||||
}
|
||||
}()
|
||||
|
||||
go custom_errors.ExtractorErrorHandler(
|
||||
localCtx,
|
||||
job.Retry,
|
||||
job.MaxPartitionErrrors,
|
||||
chExtractorErrors,
|
||||
chPartitions,
|
||||
chJobErrors,
|
||||
&wgActivePartitions,
|
||||
)
|
||||
go custom_errors.LoaderErrorHandler(
|
||||
localCtx,
|
||||
job.Retry,
|
||||
job.MaxChunkErrors,
|
||||
job.MaxExtractorBatchErrors,
|
||||
chLoadersErrors,
|
||||
chBatchesTransformed,
|
||||
chJobErrors,
|
||||
@@ -155,15 +145,14 @@ func processMigrationJob(
|
||||
|
||||
for range maxExtractors {
|
||||
wgExtractors.Go(func() {
|
||||
extractors.Consume(
|
||||
extractor.Consume(
|
||||
localCtx,
|
||||
extractor,
|
||||
job.SourceTable,
|
||||
sourceColTypes,
|
||||
job.BatchSize,
|
||||
job.ExtractorBatchSize,
|
||||
job.Retry,
|
||||
chPartitions,
|
||||
chBatchesRaw,
|
||||
chExtractorErrors,
|
||||
chJobErrors,
|
||||
&wgActivePartitions,
|
||||
&rowsRead,
|
||||
@@ -217,8 +206,6 @@ func processMigrationJob(
|
||||
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
|
||||
close(chPartitions)
|
||||
log.Debugf("chPartitions is closed (%v)", job.Name)
|
||||
close(chExtractorErrors)
|
||||
log.Debugf("chExtractorErrors is closed (%v)", job.Name)
|
||||
|
||||
wgExtractors.Wait()
|
||||
log.Debugf("wgExtractors is empty (%v)", job.Name)
|
||||
|
||||
14
config.yaml
14
config.yaml
@@ -3,15 +3,19 @@ source_db_type: sqlserver
|
||||
target_db_type: postgres
|
||||
|
||||
defaults:
|
||||
max_extractors: 2
|
||||
max_loaders: 4
|
||||
queue_size: 8
|
||||
batch_size: 25000
|
||||
batches_per_partition: 8
|
||||
max_extractors: 2
|
||||
extractor_batch_size: 25000
|
||||
extractor_queue_size: 8
|
||||
max_transformers: 2
|
||||
transformer_batch_size: 25000
|
||||
transformer_queue_size: 8
|
||||
max_loaders: 4
|
||||
loader_batch_size: 25000
|
||||
truncate_target: true
|
||||
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
||||
max_partition_errrors: 5
|
||||
max_chunk_errors: 5
|
||||
max_extractor_batch_errors: 5
|
||||
retry:
|
||||
attempts: 3
|
||||
base_delay_ms: 500
|
||||
|
||||
@@ -25,15 +25,19 @@ type ToStorageConfig struct {
|
||||
}
|
||||
|
||||
type JobConfig struct {
|
||||
MaxExtractors int `yaml:"max_extractors"`
|
||||
MaxLoaders int `yaml:"max_loaders"`
|
||||
QueueSize int `yaml:"queue_size"`
|
||||
BatchSize int `yaml:"batch_size"`
|
||||
BatchesPerPartition int `yaml:"batches_per_partition"`
|
||||
MaxExtractors int `yaml:"max_extractors"`
|
||||
ExtractorBatchSize int `yaml:"extractor_batch_size"`
|
||||
ExtractorQueueSize int `yaml:"extractor_queue_size"`
|
||||
MaxTransformers int `yaml:"max_transformers"`
|
||||
TransformerBatchSize int `yaml:"transformer_batch_size"`
|
||||
TransformerQueueSize int `yaml:"transformer_queue_size"`
|
||||
MaxLoaders int `yaml:"max_loaders"`
|
||||
LoaderBatchSize int `yaml:"loader_batch_size"`
|
||||
TruncateTarget bool `yaml:"truncate_target"`
|
||||
TruncateMethod string `yaml:"truncate_method"`
|
||||
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
|
||||
MaxChunkErrors int `yaml:"max_chunk_errors"`
|
||||
MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"`
|
||||
Retry RetryConfig `yaml:"retry"`
|
||||
RowsPerPartition int64
|
||||
ToStorage ToStorageConfig `yaml:"to_storage"`
|
||||
@@ -97,7 +101,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
||||
c.Defaults = raw.Defaults
|
||||
c.SourceDbType = raw.SourceDbType
|
||||
c.TargetDbType = raw.TargetDbType
|
||||
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
|
||||
c.Defaults.RowsPerPartition = int64(raw.Defaults.ExtractorBatchSize * raw.Defaults.BatchesPerPartition)
|
||||
|
||||
for _, node := range raw.Jobs {
|
||||
job := Job{
|
||||
@@ -108,7 +112,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
||||
return err
|
||||
}
|
||||
|
||||
job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition)
|
||||
job.RowsPerPartition = int64(job.ExtractorBatchSize * job.BatchesPerPartition)
|
||||
|
||||
c.Jobs = append(c.Jobs, job)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
|
||||
func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
|
||||
if retryCounter < 0 {
|
||||
retryCounter = 0
|
||||
}
|
||||
|
||||
@@ -1,13 +1,7 @@
|
||||
package custom_errors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type ExtractorError struct {
|
||||
@@ -20,100 +14,3 @@ type ExtractorError struct {
|
||||
func (e *ExtractorError) Error() string {
|
||||
return e.Msg
|
||||
}
|
||||
|
||||
func ExtractorErrorHandler(
|
||||
ctx context.Context,
|
||||
retryConfig config.RetryConfig,
|
||||
maxPartitionErrors int,
|
||||
chErrorsIn <-chan ExtractorError,
|
||||
chPartitionsOut chan<- models.Partition,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
wgActivePartitions *sync.WaitGroup,
|
||||
) {
|
||||
definitiveErrors := 0
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case err, ok := <-chErrorsIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if err.Partition.RetryCounter >= retryConfig.Attempts {
|
||||
wgActivePartitions.Done()
|
||||
definitiveErrors++
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
|
||||
Prev: &err,
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- jobError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
|
||||
fatalError := JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
|
||||
Prev: &err,
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- fatalError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
} else {
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter),
|
||||
Prev: &err,
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- jobError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
newPartition := err.Partition
|
||||
newPartition.RetryCounter++
|
||||
|
||||
delay := computeBackoffDelay(
|
||||
newPartition.RetryCounter,
|
||||
retryConfig.BaseDelayMs,
|
||||
retryConfig.MaxDelayMs,
|
||||
retryConfig.MaxJitterMs,
|
||||
)
|
||||
|
||||
if err.HasLastId {
|
||||
newPartition.ParentId = err.Partition.Id
|
||||
newPartition.Id = uuid.New()
|
||||
newPartition.Range.Min = err.LastId
|
||||
newPartition.Range.IsMinInclusive = false
|
||||
}
|
||||
|
||||
requeueWithBackoff(ctx, delay, func() {
|
||||
select {
|
||||
case chPartitionsOut <- newPartition:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,7 +88,7 @@ func LoaderErrorHandler(
|
||||
}
|
||||
|
||||
err.Batch.RetryCounter++
|
||||
delay := computeBackoffDelay(
|
||||
delay := ComputeBackoffDelay(
|
||||
err.Batch.RetryCounter,
|
||||
retryConfig.BaseDelayMs,
|
||||
retryConfig.MaxDelayMs,
|
||||
|
||||
7
internal/app/db-wrapper/db_dialects/main.go
Normal file
7
internal/app/db-wrapper/db_dialects/main.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package db_dialects
|
||||
|
||||
const (
|
||||
SqlServer string = "sqlserver"
|
||||
Postgres string = "postgres"
|
||||
Null string = "null"
|
||||
)
|
||||
@@ -4,13 +4,15 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
||||
mssql "github.com/microsoft/go-mssqldb"
|
||||
)
|
||||
|
||||
func init() {
|
||||
Register("sqlserver", func() DbWrapper {
|
||||
return &mssqlDbWrapper{dialect: "sqlserver"}
|
||||
Register(dbdialects.SqlServer, func() DbWrapper {
|
||||
return &mssqlDbWrapper{dialect: dbdialects.SqlServer}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -174,3 +176,74 @@ func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table
|
||||
|
||||
return rowsAffected, nil
|
||||
}
|
||||
|
||||
func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
|
||||
var sbQuery strings.Builder
|
||||
|
||||
sbQuery.WriteString("SELECT ")
|
||||
|
||||
if len(q.Columns) == 0 {
|
||||
sbQuery.WriteString("*")
|
||||
} else {
|
||||
for i, col := range q.Columns {
|
||||
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
|
||||
|
||||
switch col.Type() {
|
||||
case "GEOMETRY":
|
||||
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
|
||||
}
|
||||
|
||||
if i < len(q.Columns)-1 {
|
||||
sbQuery.WriteString(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", q.Schema, q.Table)
|
||||
|
||||
if q.LowerLimit.IsValid || q.UpperLimit.IsValid {
|
||||
sbQuery.WriteString(" WHERE ")
|
||||
|
||||
if q.LowerLimit.IsValid {
|
||||
fmt.Fprintf(&sbQuery, "[%s]", q.PrimaryKey)
|
||||
if q.LowerLimit.IsInclusive {
|
||||
sbQuery.WriteString(" >=")
|
||||
} else {
|
||||
sbQuery.WriteString(" >")
|
||||
}
|
||||
sbQuery.WriteString(" @min")
|
||||
}
|
||||
|
||||
if q.LowerLimit.IsValid && q.UpperLimit.IsValid {
|
||||
sbQuery.WriteString(" AND ")
|
||||
}
|
||||
|
||||
if q.UpperLimit.IsValid {
|
||||
fmt.Fprintf(&sbQuery, "[%s]", q.PrimaryKey)
|
||||
if q.UpperLimit.IsInclusive {
|
||||
sbQuery.WriteString(" <=")
|
||||
} else {
|
||||
sbQuery.WriteString(" <")
|
||||
}
|
||||
sbQuery.WriteString(" @max")
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", q.PrimaryKey)
|
||||
|
||||
queryString := sbQuery.String()
|
||||
|
||||
// logrus.Debugf("Query: %s", queryString)
|
||||
|
||||
var queryArgs []any
|
||||
|
||||
if q.LowerLimit.IsValid {
|
||||
queryArgs = append(queryArgs, sql.Named("min", q.LowerLimit.Value))
|
||||
}
|
||||
|
||||
if q.UpperLimit.IsValid {
|
||||
queryArgs = append(queryArgs, sql.Named("max", q.UpperLimit.Value))
|
||||
}
|
||||
|
||||
return mw.Query(ctx, queryString, queryArgs...)
|
||||
}
|
||||
|
||||
@@ -3,14 +3,17 @@ package dbwrapper
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
func init() {
|
||||
Register("postgres", func() DbWrapper {
|
||||
return &postgresDbWrapper{dialect: "postgres"}
|
||||
Register(dbdialects.Postgres, func() DbWrapper {
|
||||
return &postgresDbWrapper{dialect: dbdialects.Postgres}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -126,3 +129,75 @@ func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, tab
|
||||
|
||||
return affectedRows, nil
|
||||
}
|
||||
|
||||
func (pw *postgresDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
|
||||
var sbQuery strings.Builder
|
||||
|
||||
sbQuery.WriteString("SELECT ")
|
||||
|
||||
if len(q.Columns) == 0 {
|
||||
sbQuery.WriteString("*")
|
||||
} else {
|
||||
for i, col := range q.Columns {
|
||||
switch col.Type() {
|
||||
case "GEOMETRY":
|
||||
fmt.Fprintf(&sbQuery, `ST_AsEWKB("%s") AS "%s"`, col.Name(), col.Name())
|
||||
default:
|
||||
fmt.Fprintf(&sbQuery, `"%s"`, col.Name())
|
||||
}
|
||||
|
||||
if i < len(q.Columns)-1 {
|
||||
sbQuery.WriteString(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(&sbQuery, ` FROM "%s"."%s"`, q.Schema, q.Table)
|
||||
|
||||
if q.LowerLimit.IsValid || q.UpperLimit.IsValid {
|
||||
sbQuery.WriteString(" WHERE ")
|
||||
paramIdx := 1
|
||||
|
||||
if q.LowerLimit.IsValid {
|
||||
fmt.Fprintf(&sbQuery, `"%s"`, q.PrimaryKey)
|
||||
if q.LowerLimit.IsInclusive {
|
||||
sbQuery.WriteString(" >=")
|
||||
} else {
|
||||
sbQuery.WriteString(" >")
|
||||
}
|
||||
fmt.Fprintf(&sbQuery, " $%d", paramIdx)
|
||||
paramIdx++
|
||||
}
|
||||
|
||||
if q.LowerLimit.IsValid && q.UpperLimit.IsValid {
|
||||
sbQuery.WriteString(" AND ")
|
||||
}
|
||||
|
||||
if q.UpperLimit.IsValid {
|
||||
fmt.Fprintf(&sbQuery, `"%s"`, q.PrimaryKey)
|
||||
if q.UpperLimit.IsInclusive {
|
||||
sbQuery.WriteString(" <=")
|
||||
} else {
|
||||
sbQuery.WriteString(" <")
|
||||
}
|
||||
fmt.Fprintf(&sbQuery, " $%d", paramIdx)
|
||||
paramIdx++
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(&sbQuery, ` ORDER BY "%s" ASC`, q.PrimaryKey)
|
||||
|
||||
queryString := sbQuery.String()
|
||||
|
||||
var queryArgs []any
|
||||
|
||||
if q.LowerLimit.IsValid {
|
||||
queryArgs = append(queryArgs, q.LowerLimit.Value)
|
||||
}
|
||||
|
||||
if q.UpperLimit.IsValid {
|
||||
queryArgs = append(queryArgs, q.UpperLimit.Value)
|
||||
}
|
||||
|
||||
return pw.Query(ctx, queryString, queryArgs...)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ package dbwrapper
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
|
||||
@@ -24,6 +26,21 @@ type RowResult interface {
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
type ExtractorQueryLimit struct {
|
||||
IsValid bool
|
||||
IsInclusive bool
|
||||
Value int64
|
||||
}
|
||||
|
||||
type ExtractionQuery struct {
|
||||
Schema string
|
||||
Table string
|
||||
PrimaryKey string
|
||||
Columns []models.ColumnType
|
||||
LowerLimit ExtractorQueryLimit
|
||||
UpperLimit ExtractorQueryLimit
|
||||
}
|
||||
|
||||
type DbWrapper interface {
|
||||
Close() error
|
||||
Connect(ctx context.Context, dbUrl string) error
|
||||
@@ -32,4 +49,5 @@ type DbWrapper interface {
|
||||
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
|
||||
QueryRow(ctx context.Context, query string, args ...any) RowResult
|
||||
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
|
||||
QueryFromObject(ctx context.Context, query ExtractionQuery) (RowsResult, error)
|
||||
}
|
||||
|
||||
94
internal/app/etl/extractors/consume.go
Normal file
94
internal/app/etl/extractors/consume.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (ex *GenericExtractor) Consume(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
retryConfig config.RetryConfig,
|
||||
chPartitionsIn <-chan models.Partition,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chErrorsOut chan<- custom_errors.JobError,
|
||||
wgActivePartitions *sync.WaitGroup,
|
||||
rowsRead *int64,
|
||||
) {
|
||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
||||
})
|
||||
|
||||
if indexPrimaryKey == -1 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: "Primary key not found in provided columns",
|
||||
}:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case partition, ok := <-chPartitionsIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
rowsReadResult, err := ex.ProcessPartitionWithRetries(
|
||||
ctx,
|
||||
tableInfo,
|
||||
columns,
|
||||
batchSize,
|
||||
partition,
|
||||
indexPrimaryKey,
|
||||
retryConfig,
|
||||
chBatchesOut,
|
||||
)
|
||||
wgActivePartitions.Done()
|
||||
|
||||
if rowsReadResult > 0 {
|
||||
current := atomic.LoadInt64(rowsRead)
|
||||
logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
|
||||
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,100 +2,41 @@ package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func Consume(
|
||||
type GenericExtractor struct {
|
||||
db dbwrapper.DbWrapper
|
||||
}
|
||||
|
||||
func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor {
|
||||
return GenericExtractor{db: db}
|
||||
}
|
||||
|
||||
func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error {
|
||||
select {
|
||||
case chBatchesOut <- batch:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func flush(
|
||||
ctx context.Context,
|
||||
extractor etl.Extractor,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
partition *models.Partition,
|
||||
batchSize int,
|
||||
chPartitionsIn <-chan models.Partition,
|
||||
batchRows []models.UnknownRowValues,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chErrorsOut chan<- custom_errors.ExtractorError,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActivePartitions *sync.WaitGroup,
|
||||
rowsRead *int64,
|
||||
) {
|
||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
||||
})
|
||||
|
||||
if indexPrimaryKey == -1 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- custom_errors.JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: "Primary key not found in provided columns",
|
||||
}:
|
||||
) error {
|
||||
if len(batchRows) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case partition, ok := <-chPartitionsIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
rowsReadResult, err := extractor.Exec(
|
||||
ctx,
|
||||
tableInfo,
|
||||
columns,
|
||||
batchSize,
|
||||
partition,
|
||||
indexPrimaryKey,
|
||||
chBatchesOut,
|
||||
)
|
||||
|
||||
if rowsReadResult > 0 {
|
||||
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- *exError:
|
||||
}
|
||||
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chJobErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
wgActivePartitions.Done()
|
||||
}
|
||||
}
|
||||
batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows}
|
||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||
return sendBatch(ctx, chBatchesOut, batch)
|
||||
}
|
||||
|
||||
@@ -1,209 +0,0 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type MssqlExtractor struct {
|
||||
db dbwrapper.DbWrapper
|
||||
}
|
||||
|
||||
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
|
||||
return &MssqlExtractor{db: db}
|
||||
}
|
||||
|
||||
func buildExtractQueryMssql(
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
includeRange bool,
|
||||
isMinInclusive bool,
|
||||
isMaxInclusive bool,
|
||||
hasMin bool,
|
||||
hasMax bool,
|
||||
) string {
|
||||
var sbQuery strings.Builder
|
||||
|
||||
sbQuery.WriteString("SELECT ")
|
||||
|
||||
if len(columns) == 0 {
|
||||
sbQuery.WriteString("*")
|
||||
} else {
|
||||
for i, col := range columns {
|
||||
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
|
||||
|
||||
if col.Type() == "GEOMETRY" {
|
||||
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
|
||||
}
|
||||
|
||||
if i < len(columns)-1 {
|
||||
sbQuery.WriteString(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table)
|
||||
|
||||
if includeRange && (hasMin || hasMax) {
|
||||
sbQuery.WriteString(" WHERE ")
|
||||
|
||||
if hasMin {
|
||||
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
|
||||
if isMinInclusive {
|
||||
sbQuery.WriteString(" >=")
|
||||
} else {
|
||||
sbQuery.WriteString(" >")
|
||||
}
|
||||
sbQuery.WriteString(" @min")
|
||||
}
|
||||
|
||||
if hasMin && hasMax {
|
||||
sbQuery.WriteString(" AND ")
|
||||
}
|
||||
|
||||
if hasMax {
|
||||
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
|
||||
if isMaxInclusive {
|
||||
sbQuery.WriteString(" <=")
|
||||
} else {
|
||||
sbQuery.WriteString(" <")
|
||||
}
|
||||
sbQuery.WriteString(" @max")
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey)
|
||||
|
||||
return sbQuery.String()
|
||||
}
|
||||
|
||||
func errorFromLastRow(
|
||||
lastRow models.UnknownRowValues,
|
||||
indexPrimaryKey int,
|
||||
partition models.Partition,
|
||||
previousError error,
|
||||
) *custom_errors.ExtractorError {
|
||||
lastIdRawValue := lastRow[indexPrimaryKey]
|
||||
|
||||
lastId, ok := convert.ToInt64(lastIdRawValue)
|
||||
if !ok {
|
||||
currentPartition := partition
|
||||
currentPartition.RetryCounter = 3
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: currentPartition,
|
||||
HasLastId: true,
|
||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: partition,
|
||||
HasLastId: true,
|
||||
LastId: lastId,
|
||||
Msg: previousError.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
func (mssqlEx *MssqlExtractor) Exec(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int, error) {
|
||||
hasMin := partition.HasRange && partition.Range.Min > 0
|
||||
hasMax := partition.HasRange && partition.Range.Max > 0
|
||||
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
|
||||
|
||||
var queryArgs []any
|
||||
if hasMin {
|
||||
queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min))
|
||||
}
|
||||
if hasMax {
|
||||
queryArgs = append(queryArgs, sql.Named("max", partition.Range.Max))
|
||||
}
|
||||
|
||||
rowsRead := 0
|
||||
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
|
||||
if err != nil {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
||||
|
||||
for rows.Next() {
|
||||
rowValues := make([]any, len(columns))
|
||||
scanArgs := make([]any, len(columns))
|
||||
|
||||
for i := range rowValues {
|
||||
scanArgs[i] = &rowValues[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
if len(batchRows) == 0 {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
rowsRead++
|
||||
|
||||
batchRows = append(batchRows, rowValues)
|
||||
if len(batchRows) >= batchSize {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return rowsRead, nil
|
||||
}
|
||||
@@ -1,157 +0,0 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type PostgresExtractor struct {
|
||||
db dbwrapper.DbWrapper
|
||||
}
|
||||
|
||||
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
|
||||
return &PostgresExtractor{db: db}
|
||||
}
|
||||
|
||||
func buildExtractQueryPostgres(
|
||||
sourceDbInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
includeRange bool,
|
||||
isMinInclusive bool,
|
||||
isMaxInclusive bool,
|
||||
hasMin bool,
|
||||
hasMax bool,
|
||||
) string {
|
||||
var sbColumns strings.Builder
|
||||
|
||||
if len(columns) == 0 {
|
||||
sbColumns.WriteString("*")
|
||||
} else {
|
||||
for i, col := range columns {
|
||||
if col.Type() == "GEOMETRY" {
|
||||
sbColumns.WriteString(`ST_AsEWKB("`)
|
||||
sbColumns.WriteString(col.Name())
|
||||
sbColumns.WriteString(`") AS "`)
|
||||
sbColumns.WriteString(col.Name())
|
||||
sbColumns.WriteString(`"`)
|
||||
} else {
|
||||
sbColumns.WriteString(`"`)
|
||||
sbColumns.WriteString(col.Name())
|
||||
sbColumns.WriteString(`"`)
|
||||
}
|
||||
|
||||
if i < len(columns)-1 {
|
||||
sbColumns.WriteString(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table)
|
||||
|
||||
if includeRange && (hasMin || hasMax) {
|
||||
query += " WHERE "
|
||||
paramIdx := 1
|
||||
|
||||
if hasMin {
|
||||
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
|
||||
if isMinInclusive {
|
||||
query += " >="
|
||||
} else {
|
||||
query += " >"
|
||||
}
|
||||
query += fmt.Sprintf(" $%d", paramIdx)
|
||||
paramIdx++
|
||||
}
|
||||
|
||||
if hasMin && hasMax {
|
||||
query += " AND "
|
||||
}
|
||||
|
||||
if hasMax {
|
||||
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
|
||||
if isMaxInclusive {
|
||||
query += " <="
|
||||
} else {
|
||||
query += " <"
|
||||
}
|
||||
query += fmt.Sprintf(" $%d", paramIdx)
|
||||
}
|
||||
}
|
||||
|
||||
query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey)
|
||||
|
||||
return query
|
||||
}
|
||||
|
||||
func (postgresEx *PostgresExtractor) Exec(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int, error) {
|
||||
hasMin := partition.HasRange && partition.Range.Min > 0
|
||||
hasMax := partition.HasRange && partition.Range.Max > 0
|
||||
query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
|
||||
|
||||
var queryArgs []any
|
||||
if hasMin {
|
||||
queryArgs = append(queryArgs, partition.Range.Min)
|
||||
}
|
||||
if hasMax {
|
||||
queryArgs = append(queryArgs, partition.Range.Max)
|
||||
}
|
||||
|
||||
rowsRead := 0
|
||||
rows, err := postgresEx.db.Query(ctx, query, queryArgs...)
|
||||
if err != nil {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
||||
|
||||
for rows.Next() {
|
||||
values, err := rows.Values()
|
||||
if err != nil {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
rowsRead++
|
||||
|
||||
batchRows = append(batchRows, values)
|
||||
|
||||
if len(batchRows) >= batchSize {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, ctx.Err()
|
||||
}
|
||||
|
||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
||||
}
|
||||
|
||||
if len(batchRows) > 0 {
|
||||
select {
|
||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
||||
case <-ctx.Done():
|
||||
return rowsRead, nil
|
||||
}
|
||||
}
|
||||
|
||||
return rowsRead, nil
|
||||
}
|
||||
75
internal/app/etl/extractors/process-with-retries.go
Normal file
75
internal/app/etl/extractors/process-with-retries.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
// "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (ex *GenericExtractor) ProcessPartitionWithRetries(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
retryConfig config.RetryConfig,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int64, error) {
|
||||
var totalRowsRead int64
|
||||
currentParitition := partition
|
||||
|
||||
for {
|
||||
rowsRead, err := ex.ProcessPartition(
|
||||
ctx,
|
||||
tableInfo,
|
||||
columns,
|
||||
batchSize,
|
||||
currentParitition,
|
||||
indexPrimaryKey,
|
||||
chBatchesOut,
|
||||
)
|
||||
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
|
||||
totalRowsRead += rowsRead
|
||||
|
||||
if err == nil {
|
||||
return totalRowsRead, nil
|
||||
}
|
||||
|
||||
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
|
||||
currentParitition.RetryCounter++
|
||||
|
||||
if currentParitition.RetryCounter >= retryConfig.Attempts {
|
||||
return totalRowsRead, &custom_errors.JobError{
|
||||
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
|
||||
Prev: err,
|
||||
}
|
||||
}
|
||||
|
||||
if exError.HasLastId {
|
||||
currentParitition.ParentId = exError.Partition.Id
|
||||
currentParitition.Id = uuid.New()
|
||||
currentParitition.Range.Min = exError.LastId
|
||||
currentParitition.Range.IsMinInclusive = false
|
||||
}
|
||||
|
||||
delay := custom_errors.ComputeBackoffDelay(
|
||||
currentParitition.RetryCounter,
|
||||
retryConfig.BaseDelayMs,
|
||||
retryConfig.MaxDelayMs,
|
||||
retryConfig.MaxJitterMs,
|
||||
)
|
||||
time.Sleep(delay)
|
||||
continue
|
||||
}
|
||||
|
||||
return totalRowsRead, err
|
||||
}
|
||||
}
|
||||
116
internal/app/etl/extractors/process.go
Normal file
116
internal/app/etl/extractors/process.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package extractors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
// "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func errorFromLastPartitionRow(
|
||||
lastRow models.UnknownRowValues,
|
||||
indexPrimaryKey int,
|
||||
partition models.Partition,
|
||||
previousError error,
|
||||
) error {
|
||||
lastIdRawValue := lastRow[indexPrimaryKey]
|
||||
|
||||
lastId, ok := convert.ToInt64(lastIdRawValue)
|
||||
if !ok {
|
||||
currentPartition := partition
|
||||
currentPartition.RetryCounter = 3
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: currentPartition,
|
||||
HasLastId: true,
|
||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
return &custom_errors.ExtractorError{
|
||||
Partition: partition,
|
||||
HasLastId: true,
|
||||
LastId: lastId,
|
||||
Msg: previousError.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
func (ex *GenericExtractor) ProcessPartition(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int64, error) {
|
||||
query := dbwrapper.ExtractionQuery{
|
||||
Schema: tableInfo.Schema,
|
||||
Table: tableInfo.Table,
|
||||
PrimaryKey: tableInfo.PrimaryKey,
|
||||
Columns: columns,
|
||||
LowerLimit: dbwrapper.ExtractorQueryLimit{
|
||||
IsValid: partition.HasRange && partition.Range.Min > 0,
|
||||
IsInclusive: partition.Range.IsMinInclusive,
|
||||
Value: partition.Range.Min,
|
||||
},
|
||||
UpperLimit: dbwrapper.ExtractorQueryLimit{
|
||||
IsValid: partition.HasRange && partition.Range.Max > 0,
|
||||
IsInclusive: partition.Range.IsMaxInclusive,
|
||||
Value: partition.Range.Max,
|
||||
},
|
||||
}
|
||||
|
||||
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
|
||||
rows, err := ex.db.QueryFromObject(ctx, query)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
||||
var rowsRead int64 = 0
|
||||
|
||||
for rows.Next() {
|
||||
rowValues := make([]any, len(columns))
|
||||
scanArgs := make([]any, len(columns))
|
||||
|
||||
for i := range rowValues {
|
||||
scanArgs[i] = &rowValues[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
if len(batchRows) == 0 {
|
||||
return rowsRead, err
|
||||
}
|
||||
|
||||
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
|
||||
return rowsRead, err
|
||||
}
|
||||
|
||||
lastRow := batchRows[len(batchRows)-1]
|
||||
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
|
||||
}
|
||||
rowsRead++
|
||||
|
||||
batchRows = append(batchRows, rowValues)
|
||||
if len(batchRows) >= batchSize {
|
||||
// logrus.Debugf("Batch size reached, flushing batch with %v rows (rowsRead=%v)", len(batchRows), rowsRead)
|
||||
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
|
||||
// logrus.Warnf("Error flushing rows: %v", err)
|
||||
return rowsRead, err
|
||||
}
|
||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||
}
|
||||
}
|
||||
|
||||
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
|
||||
return rowsRead, err
|
||||
}
|
||||
|
||||
return rowsRead, rows.Err()
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func PartitionRangeGenerator(
|
||||
@@ -32,6 +33,7 @@ func PartitionRangeGenerator(
|
||||
}
|
||||
|
||||
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
|
||||
logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -51,5 +53,7 @@ func PartitionRangeGenerator(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table)
|
||||
|
||||
return partitions, nil
|
||||
}
|
||||
|
||||
@@ -234,6 +234,7 @@ ORDER BY batch_id`,
|
||||
RetryCounter: 0,
|
||||
Range: models.PartitionRange{
|
||||
IsMinInclusive: true,
|
||||
IsMaxInclusive: true,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
type Extractor interface {
|
||||
Exec(
|
||||
ProcessPartition(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
|
||||
Reference in New Issue
Block a user