Compare commits
10 Commits
6ad25e5889
...
2a5f703f3c
| Author | SHA1 | Date | |
|---|---|---|---|
|
2a5f703f3c
|
|||
|
7cb959a103
|
|||
|
6414943cf3
|
|||
|
00459e42e6
|
|||
|
52fe083ab7
|
|||
|
9a00d6af04
|
|||
|
33af391986
|
|||
|
2b2d740d2e
|
|||
|
fbe17b3842
|
|||
|
7bde77dcc5
|
@@ -118,7 +118,7 @@ func processMigrationJobs(
|
|||||||
|
|
||||||
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
|
sourceTableAnalyzer := table_analyzers.NewMssqlTableAnalyzer(sourceDb)
|
||||||
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
targetTableAnalyzer := table_analyzers.NewPostgresTableAnalyzer(targetDb)
|
||||||
extractor := extractors.NewMssqlExtractor(sourceDb)
|
extractor := extractors.NewExtractor(sourceDb)
|
||||||
loader := loaders.NewGenericLoader(targetDb)
|
loader := loaders.NewGenericLoader(targetDb)
|
||||||
|
|
||||||
var azureClient *azure.Client
|
var azureClient *azure.Client
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ func processMigrationJob(
|
|||||||
targetDbWrapper dbwrapper.DbWrapper,
|
targetDbWrapper dbwrapper.DbWrapper,
|
||||||
sourceTableAnalyzer etl.TableAnalyzer,
|
sourceTableAnalyzer etl.TableAnalyzer,
|
||||||
targetTableAnalyzer etl.TableAnalyzer,
|
targetTableAnalyzer etl.TableAnalyzer,
|
||||||
extractor etl.Extractor,
|
extractor extractors.GenericExtractor,
|
||||||
azureClient *azure.Client,
|
azureClient *azure.Client,
|
||||||
loader etl.Loader,
|
loader etl.Loader,
|
||||||
job config.Job,
|
job config.Job,
|
||||||
@@ -110,12 +110,11 @@ func processMigrationJob(
|
|||||||
log.Error("Unexpected error calculating batch ranges: ", err)
|
log.Error("Unexpected error calculating batch ranges: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
chJobErrors := make(chan custom_errors.JobError, job.QueueSize)
|
chJobErrors := make(chan custom_errors.JobError, job.ExtractorQueueSize)
|
||||||
chExtractorErrors := make(chan custom_errors.ExtractorError, job.QueueSize)
|
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
|
||||||
chLoadersErrors := make(chan custom_errors.LoaderError, job.QueueSize)
|
chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
|
||||||
chPartitions := make(chan models.Partition, job.QueueSize)
|
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
|
||||||
chBatchesRaw := make(chan models.Batch, job.QueueSize)
|
chBatchesTransformed := make(chan models.Batch, job.ExtractorQueueSize)
|
||||||
chBatchesTransformed := make(chan models.Batch, job.QueueSize)
|
|
||||||
|
|
||||||
var wgActivePartitions sync.WaitGroup
|
var wgActivePartitions sync.WaitGroup
|
||||||
var wgActiveBatches sync.WaitGroup
|
var wgActiveBatches sync.WaitGroup
|
||||||
@@ -131,19 +130,10 @@ func processMigrationJob(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go custom_errors.ExtractorErrorHandler(
|
|
||||||
localCtx,
|
|
||||||
job.Retry,
|
|
||||||
job.MaxPartitionErrrors,
|
|
||||||
chExtractorErrors,
|
|
||||||
chPartitions,
|
|
||||||
chJobErrors,
|
|
||||||
&wgActivePartitions,
|
|
||||||
)
|
|
||||||
go custom_errors.LoaderErrorHandler(
|
go custom_errors.LoaderErrorHandler(
|
||||||
localCtx,
|
localCtx,
|
||||||
job.Retry,
|
job.Retry,
|
||||||
job.MaxChunkErrors,
|
job.MaxExtractorBatchErrors,
|
||||||
chLoadersErrors,
|
chLoadersErrors,
|
||||||
chBatchesTransformed,
|
chBatchesTransformed,
|
||||||
chJobErrors,
|
chJobErrors,
|
||||||
@@ -155,15 +145,14 @@ func processMigrationJob(
|
|||||||
|
|
||||||
for range maxExtractors {
|
for range maxExtractors {
|
||||||
wgExtractors.Go(func() {
|
wgExtractors.Go(func() {
|
||||||
extractors.Consume(
|
extractor.Consume(
|
||||||
localCtx,
|
localCtx,
|
||||||
extractor,
|
|
||||||
job.SourceTable,
|
job.SourceTable,
|
||||||
sourceColTypes,
|
sourceColTypes,
|
||||||
job.BatchSize,
|
job.ExtractorBatchSize,
|
||||||
|
job.Retry,
|
||||||
chPartitions,
|
chPartitions,
|
||||||
chBatchesRaw,
|
chBatchesRaw,
|
||||||
chExtractorErrors,
|
|
||||||
chJobErrors,
|
chJobErrors,
|
||||||
&wgActivePartitions,
|
&wgActivePartitions,
|
||||||
&rowsRead,
|
&rowsRead,
|
||||||
@@ -217,8 +206,6 @@ func processMigrationJob(
|
|||||||
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
|
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
|
||||||
close(chPartitions)
|
close(chPartitions)
|
||||||
log.Debugf("chPartitions is closed (%v)", job.Name)
|
log.Debugf("chPartitions is closed (%v)", job.Name)
|
||||||
close(chExtractorErrors)
|
|
||||||
log.Debugf("chExtractorErrors is closed (%v)", job.Name)
|
|
||||||
|
|
||||||
wgExtractors.Wait()
|
wgExtractors.Wait()
|
||||||
log.Debugf("wgExtractors is empty (%v)", job.Name)
|
log.Debugf("wgExtractors is empty (%v)", job.Name)
|
||||||
|
|||||||
14
config.yaml
14
config.yaml
@@ -3,15 +3,19 @@ source_db_type: sqlserver
|
|||||||
target_db_type: postgres
|
target_db_type: postgres
|
||||||
|
|
||||||
defaults:
|
defaults:
|
||||||
max_extractors: 2
|
|
||||||
max_loaders: 4
|
|
||||||
queue_size: 8
|
|
||||||
batch_size: 25000
|
|
||||||
batches_per_partition: 8
|
batches_per_partition: 8
|
||||||
|
max_extractors: 2
|
||||||
|
extractor_batch_size: 25000
|
||||||
|
extractor_queue_size: 8
|
||||||
|
max_transformers: 2
|
||||||
|
transformer_batch_size: 25000
|
||||||
|
transformer_queue_size: 8
|
||||||
|
max_loaders: 4
|
||||||
|
loader_batch_size: 25000
|
||||||
truncate_target: true
|
truncate_target: true
|
||||||
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
||||||
max_partition_errrors: 5
|
max_partition_errrors: 5
|
||||||
max_chunk_errors: 5
|
max_extractor_batch_errors: 5
|
||||||
retry:
|
retry:
|
||||||
attempts: 3
|
attempts: 3
|
||||||
base_delay_ms: 500
|
base_delay_ms: 500
|
||||||
|
|||||||
@@ -25,15 +25,19 @@ type ToStorageConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type JobConfig struct {
|
type JobConfig struct {
|
||||||
MaxExtractors int `yaml:"max_extractors"`
|
|
||||||
MaxLoaders int `yaml:"max_loaders"`
|
|
||||||
QueueSize int `yaml:"queue_size"`
|
|
||||||
BatchSize int `yaml:"batch_size"`
|
|
||||||
BatchesPerPartition int `yaml:"batches_per_partition"`
|
BatchesPerPartition int `yaml:"batches_per_partition"`
|
||||||
|
MaxExtractors int `yaml:"max_extractors"`
|
||||||
|
ExtractorBatchSize int `yaml:"extractor_batch_size"`
|
||||||
|
ExtractorQueueSize int `yaml:"extractor_queue_size"`
|
||||||
|
MaxTransformers int `yaml:"max_transformers"`
|
||||||
|
TransformerBatchSize int `yaml:"transformer_batch_size"`
|
||||||
|
TransformerQueueSize int `yaml:"transformer_queue_size"`
|
||||||
|
MaxLoaders int `yaml:"max_loaders"`
|
||||||
|
LoaderBatchSize int `yaml:"loader_batch_size"`
|
||||||
TruncateTarget bool `yaml:"truncate_target"`
|
TruncateTarget bool `yaml:"truncate_target"`
|
||||||
TruncateMethod string `yaml:"truncate_method"`
|
TruncateMethod string `yaml:"truncate_method"`
|
||||||
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
|
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
|
||||||
MaxChunkErrors int `yaml:"max_chunk_errors"`
|
MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"`
|
||||||
Retry RetryConfig `yaml:"retry"`
|
Retry RetryConfig `yaml:"retry"`
|
||||||
RowsPerPartition int64
|
RowsPerPartition int64
|
||||||
ToStorage ToStorageConfig `yaml:"to_storage"`
|
ToStorage ToStorageConfig `yaml:"to_storage"`
|
||||||
@@ -97,7 +101,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
|||||||
c.Defaults = raw.Defaults
|
c.Defaults = raw.Defaults
|
||||||
c.SourceDbType = raw.SourceDbType
|
c.SourceDbType = raw.SourceDbType
|
||||||
c.TargetDbType = raw.TargetDbType
|
c.TargetDbType = raw.TargetDbType
|
||||||
c.Defaults.RowsPerPartition = int64(raw.Defaults.BatchSize * raw.Defaults.BatchesPerPartition)
|
c.Defaults.RowsPerPartition = int64(raw.Defaults.ExtractorBatchSize * raw.Defaults.BatchesPerPartition)
|
||||||
|
|
||||||
for _, node := range raw.Jobs {
|
for _, node := range raw.Jobs {
|
||||||
job := Job{
|
job := Job{
|
||||||
@@ -108,7 +112,7 @@ func (c *MigrationConfig) UnmarshalYAML(value *yaml.Node) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
job.RowsPerPartition = int64(job.BatchSize * job.BatchesPerPartition)
|
job.RowsPerPartition = int64(job.ExtractorBatchSize * job.BatchesPerPartition)
|
||||||
|
|
||||||
c.Jobs = append(c.Jobs, job)
|
c.Jobs = append(c.Jobs, job)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func computeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
|
func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJitterMs int) time.Duration {
|
||||||
if retryCounter < 0 {
|
if retryCounter < 0 {
|
||||||
retryCounter = 0
|
retryCounter = 0
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,13 +1,7 @@
|
|||||||
package custom_errors
|
package custom_errors
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ExtractorError struct {
|
type ExtractorError struct {
|
||||||
@@ -20,100 +14,3 @@ type ExtractorError struct {
|
|||||||
func (e *ExtractorError) Error() string {
|
func (e *ExtractorError) Error() string {
|
||||||
return e.Msg
|
return e.Msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExtractorErrorHandler(
|
|
||||||
ctx context.Context,
|
|
||||||
retryConfig config.RetryConfig,
|
|
||||||
maxPartitionErrors int,
|
|
||||||
chErrorsIn <-chan ExtractorError,
|
|
||||||
chPartitionsOut chan<- models.Partition,
|
|
||||||
chJobErrorsOut chan<- JobError,
|
|
||||||
wgActivePartitions *sync.WaitGroup,
|
|
||||||
) {
|
|
||||||
definitiveErrors := 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case err, ok := <-chErrorsIn:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err.Partition.RetryCounter >= retryConfig.Attempts {
|
|
||||||
wgActivePartitions.Done()
|
|
||||||
definitiveErrors++
|
|
||||||
jobError := JobError{
|
|
||||||
ShouldCancelJob: false,
|
|
||||||
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", err.Partition.Id, retryConfig.Attempts),
|
|
||||||
Prev: &err,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chJobErrorsOut <- jobError:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if maxPartitionErrors > 0 && definitiveErrors >= maxPartitionErrors {
|
|
||||||
fatalError := JobError{
|
|
||||||
ShouldCancelJob: true,
|
|
||||||
Msg: fmt.Sprintf("Partition error limit reached (%d)", maxPartitionErrors),
|
|
||||||
Prev: &err,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chJobErrorsOut <- fatalError:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
jobError := JobError{
|
|
||||||
ShouldCancelJob: false,
|
|
||||||
Msg: fmt.Sprintf("Temporal error in partition %v (retries: %d)", err.Partition.Id, err.Partition.RetryCounter),
|
|
||||||
Prev: &err,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chJobErrorsOut <- jobError:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
newPartition := err.Partition
|
|
||||||
newPartition.RetryCounter++
|
|
||||||
|
|
||||||
delay := computeBackoffDelay(
|
|
||||||
newPartition.RetryCounter,
|
|
||||||
retryConfig.BaseDelayMs,
|
|
||||||
retryConfig.MaxDelayMs,
|
|
||||||
retryConfig.MaxJitterMs,
|
|
||||||
)
|
|
||||||
|
|
||||||
if err.HasLastId {
|
|
||||||
newPartition.ParentId = err.Partition.Id
|
|
||||||
newPartition.Id = uuid.New()
|
|
||||||
newPartition.Range.Min = err.LastId
|
|
||||||
newPartition.Range.IsMinInclusive = false
|
|
||||||
}
|
|
||||||
|
|
||||||
requeueWithBackoff(ctx, delay, func() {
|
|
||||||
select {
|
|
||||||
case chPartitionsOut <- newPartition:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ func LoaderErrorHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
err.Batch.RetryCounter++
|
err.Batch.RetryCounter++
|
||||||
delay := computeBackoffDelay(
|
delay := ComputeBackoffDelay(
|
||||||
err.Batch.RetryCounter,
|
err.Batch.RetryCounter,
|
||||||
retryConfig.BaseDelayMs,
|
retryConfig.BaseDelayMs,
|
||||||
retryConfig.MaxDelayMs,
|
retryConfig.MaxDelayMs,
|
||||||
|
|||||||
7
internal/app/db-wrapper/db_dialects/main.go
Normal file
7
internal/app/db-wrapper/db_dialects/main.go
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
package db_dialects
|
||||||
|
|
||||||
|
const (
|
||||||
|
SqlServer string = "sqlserver"
|
||||||
|
Postgres string = "postgres"
|
||||||
|
Null string = "null"
|
||||||
|
)
|
||||||
@@ -4,13 +4,15 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
||||||
mssql "github.com/microsoft/go-mssqldb"
|
mssql "github.com/microsoft/go-mssqldb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
Register("sqlserver", func() DbWrapper {
|
Register(dbdialects.SqlServer, func() DbWrapper {
|
||||||
return &mssqlDbWrapper{dialect: "sqlserver"}
|
return &mssqlDbWrapper{dialect: dbdialects.SqlServer}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -174,3 +176,74 @@ func (mw *mssqlDbWrapper) SaveMassive(ctx context.Context, schema string, table
|
|||||||
|
|
||||||
return rowsAffected, nil
|
return rowsAffected, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
|
||||||
|
var sbQuery strings.Builder
|
||||||
|
|
||||||
|
sbQuery.WriteString("SELECT ")
|
||||||
|
|
||||||
|
if len(q.Columns) == 0 {
|
||||||
|
sbQuery.WriteString("*")
|
||||||
|
} else {
|
||||||
|
for i, col := range q.Columns {
|
||||||
|
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
|
||||||
|
|
||||||
|
switch col.Type() {
|
||||||
|
case "GEOMETRY":
|
||||||
|
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
if i < len(q.Columns)-1 {
|
||||||
|
sbQuery.WriteString(", ")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", q.Schema, q.Table)
|
||||||
|
|
||||||
|
if q.LowerLimit.IsValid || q.UpperLimit.IsValid {
|
||||||
|
sbQuery.WriteString(" WHERE ")
|
||||||
|
|
||||||
|
if q.LowerLimit.IsValid {
|
||||||
|
fmt.Fprintf(&sbQuery, "[%s]", q.PrimaryKey)
|
||||||
|
if q.LowerLimit.IsInclusive {
|
||||||
|
sbQuery.WriteString(" >=")
|
||||||
|
} else {
|
||||||
|
sbQuery.WriteString(" >")
|
||||||
|
}
|
||||||
|
sbQuery.WriteString(" @min")
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.LowerLimit.IsValid && q.UpperLimit.IsValid {
|
||||||
|
sbQuery.WriteString(" AND ")
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.UpperLimit.IsValid {
|
||||||
|
fmt.Fprintf(&sbQuery, "[%s]", q.PrimaryKey)
|
||||||
|
if q.UpperLimit.IsInclusive {
|
||||||
|
sbQuery.WriteString(" <=")
|
||||||
|
} else {
|
||||||
|
sbQuery.WriteString(" <")
|
||||||
|
}
|
||||||
|
sbQuery.WriteString(" @max")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", q.PrimaryKey)
|
||||||
|
|
||||||
|
queryString := sbQuery.String()
|
||||||
|
|
||||||
|
// logrus.Debugf("Query: %s", queryString)
|
||||||
|
|
||||||
|
var queryArgs []any
|
||||||
|
|
||||||
|
if q.LowerLimit.IsValid {
|
||||||
|
queryArgs = append(queryArgs, sql.Named("min", q.LowerLimit.Value))
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.UpperLimit.IsValid {
|
||||||
|
queryArgs = append(queryArgs, sql.Named("max", q.UpperLimit.Value))
|
||||||
|
}
|
||||||
|
|
||||||
|
return mw.Query(ctx, queryString, queryArgs...)
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,14 +3,17 @@ package dbwrapper
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
Register("postgres", func() DbWrapper {
|
Register(dbdialects.Postgres, func() DbWrapper {
|
||||||
return &postgresDbWrapper{dialect: "postgres"}
|
return &postgresDbWrapper{dialect: dbdialects.Postgres}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,3 +129,75 @@ func (pw *postgresDbWrapper) SaveMassive(ctx context.Context, schema string, tab
|
|||||||
|
|
||||||
return affectedRows, nil
|
return affectedRows, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pw *postgresDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery) (RowsResult, error) {
|
||||||
|
var sbQuery strings.Builder
|
||||||
|
|
||||||
|
sbQuery.WriteString("SELECT ")
|
||||||
|
|
||||||
|
if len(q.Columns) == 0 {
|
||||||
|
sbQuery.WriteString("*")
|
||||||
|
} else {
|
||||||
|
for i, col := range q.Columns {
|
||||||
|
switch col.Type() {
|
||||||
|
case "GEOMETRY":
|
||||||
|
fmt.Fprintf(&sbQuery, `ST_AsEWKB("%s") AS "%s"`, col.Name(), col.Name())
|
||||||
|
default:
|
||||||
|
fmt.Fprintf(&sbQuery, `"%s"`, col.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
if i < len(q.Columns)-1 {
|
||||||
|
sbQuery.WriteString(", ")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(&sbQuery, ` FROM "%s"."%s"`, q.Schema, q.Table)
|
||||||
|
|
||||||
|
if q.LowerLimit.IsValid || q.UpperLimit.IsValid {
|
||||||
|
sbQuery.WriteString(" WHERE ")
|
||||||
|
paramIdx := 1
|
||||||
|
|
||||||
|
if q.LowerLimit.IsValid {
|
||||||
|
fmt.Fprintf(&sbQuery, `"%s"`, q.PrimaryKey)
|
||||||
|
if q.LowerLimit.IsInclusive {
|
||||||
|
sbQuery.WriteString(" >=")
|
||||||
|
} else {
|
||||||
|
sbQuery.WriteString(" >")
|
||||||
|
}
|
||||||
|
fmt.Fprintf(&sbQuery, " $%d", paramIdx)
|
||||||
|
paramIdx++
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.LowerLimit.IsValid && q.UpperLimit.IsValid {
|
||||||
|
sbQuery.WriteString(" AND ")
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.UpperLimit.IsValid {
|
||||||
|
fmt.Fprintf(&sbQuery, `"%s"`, q.PrimaryKey)
|
||||||
|
if q.UpperLimit.IsInclusive {
|
||||||
|
sbQuery.WriteString(" <=")
|
||||||
|
} else {
|
||||||
|
sbQuery.WriteString(" <")
|
||||||
|
}
|
||||||
|
fmt.Fprintf(&sbQuery, " $%d", paramIdx)
|
||||||
|
paramIdx++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(&sbQuery, ` ORDER BY "%s" ASC`, q.PrimaryKey)
|
||||||
|
|
||||||
|
queryString := sbQuery.String()
|
||||||
|
|
||||||
|
var queryArgs []any
|
||||||
|
|
||||||
|
if q.LowerLimit.IsValid {
|
||||||
|
queryArgs = append(queryArgs, q.LowerLimit.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.UpperLimit.IsValid {
|
||||||
|
queryArgs = append(queryArgs, q.UpperLimit.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pw.Query(ctx, queryString, queryArgs...)
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ package dbwrapper
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
|
var MethodNotSupported error = errors.New("Method not supported by driver... yet :P")
|
||||||
@@ -24,6 +26,21 @@ type RowResult interface {
|
|||||||
Scan(dest ...any) error
|
Scan(dest ...any) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ExtractorQueryLimit struct {
|
||||||
|
IsValid bool
|
||||||
|
IsInclusive bool
|
||||||
|
Value int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type ExtractionQuery struct {
|
||||||
|
Schema string
|
||||||
|
Table string
|
||||||
|
PrimaryKey string
|
||||||
|
Columns []models.ColumnType
|
||||||
|
LowerLimit ExtractorQueryLimit
|
||||||
|
UpperLimit ExtractorQueryLimit
|
||||||
|
}
|
||||||
|
|
||||||
type DbWrapper interface {
|
type DbWrapper interface {
|
||||||
Close() error
|
Close() error
|
||||||
Connect(ctx context.Context, dbUrl string) error
|
Connect(ctx context.Context, dbUrl string) error
|
||||||
@@ -32,4 +49,5 @@ type DbWrapper interface {
|
|||||||
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
|
Query(ctx context.Context, query string, args ...any) (RowsResult, error)
|
||||||
QueryRow(ctx context.Context, query string, args ...any) RowResult
|
QueryRow(ctx context.Context, query string, args ...any) RowResult
|
||||||
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
|
SaveMassive(ctx context.Context, schema string, table string, columnNames []string, rows [][]any) (int64, error)
|
||||||
|
QueryFromObject(ctx context.Context, query ExtractionQuery) (RowsResult, error)
|
||||||
}
|
}
|
||||||
|
|||||||
94
internal/app/etl/extractors/consume.go
Normal file
94
internal/app/etl/extractors/consume.go
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
package extractors
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"slices"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (ex *GenericExtractor) Consume(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo config.SourceTableInfo,
|
||||||
|
columns []models.ColumnType,
|
||||||
|
batchSize int,
|
||||||
|
retryConfig config.RetryConfig,
|
||||||
|
chPartitionsIn <-chan models.Partition,
|
||||||
|
chBatchesOut chan<- models.Batch,
|
||||||
|
chErrorsOut chan<- custom_errors.JobError,
|
||||||
|
wgActivePartitions *sync.WaitGroup,
|
||||||
|
rowsRead *int64,
|
||||||
|
) {
|
||||||
|
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||||
|
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
||||||
|
})
|
||||||
|
|
||||||
|
if indexPrimaryKey == -1 {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case chErrorsOut <- custom_errors.JobError{
|
||||||
|
ShouldCancelJob: true,
|
||||||
|
Msg: "Primary key not found in provided columns",
|
||||||
|
}:
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case partition, ok := <-chPartitionsIn:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsReadResult, err := ex.ProcessPartitionWithRetries(
|
||||||
|
ctx,
|
||||||
|
tableInfo,
|
||||||
|
columns,
|
||||||
|
batchSize,
|
||||||
|
partition,
|
||||||
|
indexPrimaryKey,
|
||||||
|
retryConfig,
|
||||||
|
chBatchesOut,
|
||||||
|
)
|
||||||
|
wgActivePartitions.Done()
|
||||||
|
|
||||||
|
if rowsReadResult > 0 {
|
||||||
|
current := atomic.LoadInt64(rowsRead)
|
||||||
|
logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
|
||||||
|
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case chErrorsOut <- *jobError:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,100 +2,41 @@ package extractors
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"slices"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Consume(
|
type GenericExtractor struct {
|
||||||
ctx context.Context,
|
db dbwrapper.DbWrapper
|
||||||
extractor etl.Extractor,
|
}
|
||||||
tableInfo config.SourceTableInfo,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
batchSize int,
|
|
||||||
chPartitionsIn <-chan models.Partition,
|
|
||||||
chBatchesOut chan<- models.Batch,
|
|
||||||
chErrorsOut chan<- custom_errors.ExtractorError,
|
|
||||||
chJobErrorsOut chan<- custom_errors.JobError,
|
|
||||||
wgActivePartitions *sync.WaitGroup,
|
|
||||||
rowsRead *int64,
|
|
||||||
) {
|
|
||||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
|
||||||
return strings.EqualFold(col.Name(), tableInfo.PrimaryKey)
|
|
||||||
})
|
|
||||||
|
|
||||||
if indexPrimaryKey == -1 {
|
func NewExtractor(db dbwrapper.DbWrapper) GenericExtractor {
|
||||||
|
return GenericExtractor{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendBatch(ctx context.Context, chBatchesOut chan<- models.Batch, batch models.Batch) error {
|
||||||
select {
|
select {
|
||||||
|
case chBatchesOut <- batch:
|
||||||
|
return nil
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return ctx.Err()
|
||||||
case chJobErrorsOut <- custom_errors.JobError{
|
|
||||||
ShouldCancelJob: true,
|
|
||||||
Msg: "Primary key not found in provided columns",
|
|
||||||
}:
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case partition, ok := <-chPartitionsIn:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
rowsReadResult, err := extractor.Exec(
|
|
||||||
ctx,
|
|
||||||
tableInfo,
|
|
||||||
columns,
|
|
||||||
batchSize,
|
|
||||||
partition,
|
|
||||||
indexPrimaryKey,
|
|
||||||
chBatchesOut,
|
|
||||||
)
|
|
||||||
|
|
||||||
if rowsReadResult > 0 {
|
|
||||||
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case chErrorsOut <- *exError:
|
|
||||||
}
|
|
||||||
} else if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case chJobErrorsOut <- *jobError:
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case chErrorsOut <- custom_errors.ExtractorError{Partition: partition, Msg: err.Error()}:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
wgActivePartitions.Done()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func flush(
|
||||||
|
ctx context.Context,
|
||||||
|
partition *models.Partition,
|
||||||
|
batchSize int,
|
||||||
|
batchRows []models.UnknownRowValues,
|
||||||
|
chBatchesOut chan<- models.Batch,
|
||||||
|
) error {
|
||||||
|
if len(batchRows) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
batch := models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows}
|
||||||
|
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||||
|
return sendBatch(ctx, chBatchesOut, batch)
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,209 +0,0 @@
|
|||||||
package extractors
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
||||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MssqlExtractor struct {
|
|
||||||
db dbwrapper.DbWrapper
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMssqlExtractor(db dbwrapper.DbWrapper) etl.Extractor {
|
|
||||||
return &MssqlExtractor{db: db}
|
|
||||||
}
|
|
||||||
|
|
||||||
func buildExtractQueryMssql(
|
|
||||||
tableInfo config.SourceTableInfo,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
includeRange bool,
|
|
||||||
isMinInclusive bool,
|
|
||||||
isMaxInclusive bool,
|
|
||||||
hasMin bool,
|
|
||||||
hasMax bool,
|
|
||||||
) string {
|
|
||||||
var sbQuery strings.Builder
|
|
||||||
|
|
||||||
sbQuery.WriteString("SELECT ")
|
|
||||||
|
|
||||||
if len(columns) == 0 {
|
|
||||||
sbQuery.WriteString("*")
|
|
||||||
} else {
|
|
||||||
for i, col := range columns {
|
|
||||||
fmt.Fprintf(&sbQuery, "[%s]", col.Name())
|
|
||||||
|
|
||||||
if col.Type() == "GEOMETRY" {
|
|
||||||
fmt.Fprintf(&sbQuery, ".STAsBinary() AS [%s]", col.Name())
|
|
||||||
}
|
|
||||||
|
|
||||||
if i < len(columns)-1 {
|
|
||||||
sbQuery.WriteString(", ")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Fprintf(&sbQuery, " FROM [%s].[%s]", tableInfo.Schema, tableInfo.Table)
|
|
||||||
|
|
||||||
if includeRange && (hasMin || hasMax) {
|
|
||||||
sbQuery.WriteString(" WHERE ")
|
|
||||||
|
|
||||||
if hasMin {
|
|
||||||
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
|
|
||||||
if isMinInclusive {
|
|
||||||
sbQuery.WriteString(" >=")
|
|
||||||
} else {
|
|
||||||
sbQuery.WriteString(" >")
|
|
||||||
}
|
|
||||||
sbQuery.WriteString(" @min")
|
|
||||||
}
|
|
||||||
|
|
||||||
if hasMin && hasMax {
|
|
||||||
sbQuery.WriteString(" AND ")
|
|
||||||
}
|
|
||||||
|
|
||||||
if hasMax {
|
|
||||||
fmt.Fprintf(&sbQuery, "[%s]", tableInfo.PrimaryKey)
|
|
||||||
if isMaxInclusive {
|
|
||||||
sbQuery.WriteString(" <=")
|
|
||||||
} else {
|
|
||||||
sbQuery.WriteString(" <")
|
|
||||||
}
|
|
||||||
sbQuery.WriteString(" @max")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Fprintf(&sbQuery, " ORDER BY [%s] ASC", tableInfo.PrimaryKey)
|
|
||||||
|
|
||||||
return sbQuery.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
func errorFromLastRow(
|
|
||||||
lastRow models.UnknownRowValues,
|
|
||||||
indexPrimaryKey int,
|
|
||||||
partition models.Partition,
|
|
||||||
previousError error,
|
|
||||||
) *custom_errors.ExtractorError {
|
|
||||||
lastIdRawValue := lastRow[indexPrimaryKey]
|
|
||||||
|
|
||||||
lastId, ok := convert.ToInt64(lastIdRawValue)
|
|
||||||
if !ok {
|
|
||||||
currentPartition := partition
|
|
||||||
currentPartition.RetryCounter = 3
|
|
||||||
return &custom_errors.ExtractorError{
|
|
||||||
Partition: currentPartition,
|
|
||||||
HasLastId: true,
|
|
||||||
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return &custom_errors.ExtractorError{
|
|
||||||
Partition: partition,
|
|
||||||
HasLastId: true,
|
|
||||||
LastId: lastId,
|
|
||||||
Msg: previousError.Error(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mssqlEx *MssqlExtractor) Exec(
|
|
||||||
ctx context.Context,
|
|
||||||
tableInfo config.SourceTableInfo,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
batchSize int,
|
|
||||||
partition models.Partition,
|
|
||||||
indexPrimaryKey int,
|
|
||||||
chBatchesOut chan<- models.Batch,
|
|
||||||
) (int, error) {
|
|
||||||
hasMin := partition.HasRange && partition.Range.Min > 0
|
|
||||||
hasMax := partition.HasRange && partition.Range.Max > 0
|
|
||||||
query := buildExtractQueryMssql(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
|
|
||||||
|
|
||||||
var queryArgs []any
|
|
||||||
if hasMin {
|
|
||||||
queryArgs = append(queryArgs, sql.Named("min", partition.Range.Min))
|
|
||||||
}
|
|
||||||
if hasMax {
|
|
||||||
queryArgs = append(queryArgs, sql.Named("max", partition.Range.Max))
|
|
||||||
}
|
|
||||||
|
|
||||||
rowsRead := 0
|
|
||||||
rows, err := mssqlEx.db.Query(ctx, query, queryArgs...)
|
|
||||||
if err != nil {
|
|
||||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
rowValues := make([]any, len(columns))
|
|
||||||
scanArgs := make([]any, len(columns))
|
|
||||||
|
|
||||||
for i := range rowValues {
|
|
||||||
scanArgs[i] = &rowValues[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rows.Scan(scanArgs...); err != nil {
|
|
||||||
if len(batchRows) == 0 {
|
|
||||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
|
||||||
}
|
|
||||||
|
|
||||||
lastRow := batchRows[len(batchRows)-1]
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return rowsRead, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
|
||||||
}
|
|
||||||
rowsRead++
|
|
||||||
|
|
||||||
batchRows = append(batchRows, rowValues)
|
|
||||||
if len(batchRows) >= batchSize {
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return rowsRead, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
if errors.Is(err, ctx.Err()) {
|
|
||||||
return rowsRead, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(batchRows) > 0 {
|
|
||||||
lastRow := batchRows[len(batchRows)-1]
|
|
||||||
return rowsRead, errorFromLastRow(lastRow, indexPrimaryKey, partition, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(batchRows) > 0 {
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return rowsRead, ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return rowsRead, nil
|
|
||||||
}
|
|
||||||
@@ -1,157 +0,0 @@
|
|||||||
package extractors
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
||||||
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
type PostgresExtractor struct {
|
|
||||||
db dbwrapper.DbWrapper
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPostgresExtractor(db dbwrapper.DbWrapper) etl.Extractor {
|
|
||||||
return &PostgresExtractor{db: db}
|
|
||||||
}
|
|
||||||
|
|
||||||
func buildExtractQueryPostgres(
|
|
||||||
sourceDbInfo config.SourceTableInfo,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
includeRange bool,
|
|
||||||
isMinInclusive bool,
|
|
||||||
isMaxInclusive bool,
|
|
||||||
hasMin bool,
|
|
||||||
hasMax bool,
|
|
||||||
) string {
|
|
||||||
var sbColumns strings.Builder
|
|
||||||
|
|
||||||
if len(columns) == 0 {
|
|
||||||
sbColumns.WriteString("*")
|
|
||||||
} else {
|
|
||||||
for i, col := range columns {
|
|
||||||
if col.Type() == "GEOMETRY" {
|
|
||||||
sbColumns.WriteString(`ST_AsEWKB("`)
|
|
||||||
sbColumns.WriteString(col.Name())
|
|
||||||
sbColumns.WriteString(`") AS "`)
|
|
||||||
sbColumns.WriteString(col.Name())
|
|
||||||
sbColumns.WriteString(`"`)
|
|
||||||
} else {
|
|
||||||
sbColumns.WriteString(`"`)
|
|
||||||
sbColumns.WriteString(col.Name())
|
|
||||||
sbColumns.WriteString(`"`)
|
|
||||||
}
|
|
||||||
|
|
||||||
if i < len(columns)-1 {
|
|
||||||
sbColumns.WriteString(", ")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
query := fmt.Sprintf(`SELECT %s FROM "%s"."%s"`, sbColumns.String(), sourceDbInfo.Schema, sourceDbInfo.Table)
|
|
||||||
|
|
||||||
if includeRange && (hasMin || hasMax) {
|
|
||||||
query += " WHERE "
|
|
||||||
paramIdx := 1
|
|
||||||
|
|
||||||
if hasMin {
|
|
||||||
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
|
|
||||||
if isMinInclusive {
|
|
||||||
query += " >="
|
|
||||||
} else {
|
|
||||||
query += " >"
|
|
||||||
}
|
|
||||||
query += fmt.Sprintf(" $%d", paramIdx)
|
|
||||||
paramIdx++
|
|
||||||
}
|
|
||||||
|
|
||||||
if hasMin && hasMax {
|
|
||||||
query += " AND "
|
|
||||||
}
|
|
||||||
|
|
||||||
if hasMax {
|
|
||||||
query += fmt.Sprintf(`"%s"`, sourceDbInfo.PrimaryKey)
|
|
||||||
if isMaxInclusive {
|
|
||||||
query += " <="
|
|
||||||
} else {
|
|
||||||
query += " <"
|
|
||||||
}
|
|
||||||
query += fmt.Sprintf(" $%d", paramIdx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
query += fmt.Sprintf(` ORDER BY "%s" ASC`, sourceDbInfo.PrimaryKey)
|
|
||||||
|
|
||||||
return query
|
|
||||||
}
|
|
||||||
|
|
||||||
func (postgresEx *PostgresExtractor) Exec(
|
|
||||||
ctx context.Context,
|
|
||||||
tableInfo config.SourceTableInfo,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
batchSize int,
|
|
||||||
partition models.Partition,
|
|
||||||
indexPrimaryKey int,
|
|
||||||
chBatchesOut chan<- models.Batch,
|
|
||||||
) (int, error) {
|
|
||||||
hasMin := partition.HasRange && partition.Range.Min > 0
|
|
||||||
hasMax := partition.HasRange && partition.Range.Max > 0
|
|
||||||
query := buildExtractQueryPostgres(tableInfo, columns, partition.HasRange, partition.Range.IsMinInclusive, partition.Range.IsMaxInclusive, hasMin, hasMax)
|
|
||||||
|
|
||||||
var queryArgs []any
|
|
||||||
if hasMin {
|
|
||||||
queryArgs = append(queryArgs, partition.Range.Min)
|
|
||||||
}
|
|
||||||
if hasMax {
|
|
||||||
queryArgs = append(queryArgs, partition.Range.Max)
|
|
||||||
}
|
|
||||||
|
|
||||||
rowsRead := 0
|
|
||||||
rows, err := postgresEx.db.Query(ctx, query, queryArgs...)
|
|
||||||
if err != nil {
|
|
||||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
values, err := rows.Values()
|
|
||||||
if err != nil {
|
|
||||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
|
||||||
}
|
|
||||||
rowsRead++
|
|
||||||
|
|
||||||
batchRows = append(batchRows, values)
|
|
||||||
|
|
||||||
if len(batchRows) >= batchSize {
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return rowsRead, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return rowsRead, &custom_errors.ExtractorError{Partition: partition, HasLastId: false, Msg: err.Error()}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(batchRows) > 0 {
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- models.Batch{Id: uuid.New(), PartitionId: partition.Id, Rows: batchRows, RetryCounter: 0}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return rowsRead, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return rowsRead, nil
|
|
||||||
}
|
|
||||||
75
internal/app/etl/extractors/process-with-retries.go
Normal file
75
internal/app/etl/extractors/process-with-retries.go
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
package extractors
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
// "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (ex *GenericExtractor) ProcessPartitionWithRetries(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo config.SourceTableInfo,
|
||||||
|
columns []models.ColumnType,
|
||||||
|
batchSize int,
|
||||||
|
partition models.Partition,
|
||||||
|
indexPrimaryKey int,
|
||||||
|
retryConfig config.RetryConfig,
|
||||||
|
chBatchesOut chan<- models.Batch,
|
||||||
|
) (int64, error) {
|
||||||
|
var totalRowsRead int64
|
||||||
|
currentParitition := partition
|
||||||
|
|
||||||
|
for {
|
||||||
|
rowsRead, err := ex.ProcessPartition(
|
||||||
|
ctx,
|
||||||
|
tableInfo,
|
||||||
|
columns,
|
||||||
|
batchSize,
|
||||||
|
currentParitition,
|
||||||
|
indexPrimaryKey,
|
||||||
|
chBatchesOut,
|
||||||
|
)
|
||||||
|
// logrus.Debugf("Partition %v finished processing (%s.%s)", partition.Id, tableInfo.Schema, tableInfo.Table)
|
||||||
|
totalRowsRead += rowsRead
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
return totalRowsRead, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if exError, ok := errors.AsType[*custom_errors.ExtractorError](err); ok {
|
||||||
|
currentParitition.RetryCounter++
|
||||||
|
|
||||||
|
if currentParitition.RetryCounter >= retryConfig.Attempts {
|
||||||
|
return totalRowsRead, &custom_errors.JobError{
|
||||||
|
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
|
||||||
|
Prev: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if exError.HasLastId {
|
||||||
|
currentParitition.ParentId = exError.Partition.Id
|
||||||
|
currentParitition.Id = uuid.New()
|
||||||
|
currentParitition.Range.Min = exError.LastId
|
||||||
|
currentParitition.Range.IsMinInclusive = false
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := custom_errors.ComputeBackoffDelay(
|
||||||
|
currentParitition.RetryCounter,
|
||||||
|
retryConfig.BaseDelayMs,
|
||||||
|
retryConfig.MaxDelayMs,
|
||||||
|
retryConfig.MaxJitterMs,
|
||||||
|
)
|
||||||
|
time.Sleep(delay)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalRowsRead, err
|
||||||
|
}
|
||||||
|
}
|
||||||
116
internal/app/etl/extractors/process.go
Normal file
116
internal/app/etl/extractors/process.go
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
package extractors
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/convert"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
dbwrapper "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
// "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func errorFromLastPartitionRow(
|
||||||
|
lastRow models.UnknownRowValues,
|
||||||
|
indexPrimaryKey int,
|
||||||
|
partition models.Partition,
|
||||||
|
previousError error,
|
||||||
|
) error {
|
||||||
|
lastIdRawValue := lastRow[indexPrimaryKey]
|
||||||
|
|
||||||
|
lastId, ok := convert.ToInt64(lastIdRawValue)
|
||||||
|
if !ok {
|
||||||
|
currentPartition := partition
|
||||||
|
currentPartition.RetryCounter = 3
|
||||||
|
return &custom_errors.ExtractorError{
|
||||||
|
Partition: currentPartition,
|
||||||
|
HasLastId: true,
|
||||||
|
Msg: fmt.Sprintf("Couldn't cast last id value as int: %s", previousError.Error()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &custom_errors.ExtractorError{
|
||||||
|
Partition: partition,
|
||||||
|
HasLastId: true,
|
||||||
|
LastId: lastId,
|
||||||
|
Msg: previousError.Error(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ex *GenericExtractor) ProcessPartition(
|
||||||
|
ctx context.Context,
|
||||||
|
tableInfo config.SourceTableInfo,
|
||||||
|
columns []models.ColumnType,
|
||||||
|
batchSize int,
|
||||||
|
partition models.Partition,
|
||||||
|
indexPrimaryKey int,
|
||||||
|
chBatchesOut chan<- models.Batch,
|
||||||
|
) (int64, error) {
|
||||||
|
query := dbwrapper.ExtractionQuery{
|
||||||
|
Schema: tableInfo.Schema,
|
||||||
|
Table: tableInfo.Table,
|
||||||
|
PrimaryKey: tableInfo.PrimaryKey,
|
||||||
|
Columns: columns,
|
||||||
|
LowerLimit: dbwrapper.ExtractorQueryLimit{
|
||||||
|
IsValid: partition.HasRange && partition.Range.Min > 0,
|
||||||
|
IsInclusive: partition.Range.IsMinInclusive,
|
||||||
|
Value: partition.Range.Min,
|
||||||
|
},
|
||||||
|
UpperLimit: dbwrapper.ExtractorQueryLimit{
|
||||||
|
IsValid: partition.HasRange && partition.Range.Max > 0,
|
||||||
|
IsInclusive: partition.Range.IsMaxInclusive,
|
||||||
|
Value: partition.Range.Max,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// logrus.Debugf("Processing partition: %+v (%s.%s)", query, tableInfo.Schema, tableInfo.Table)
|
||||||
|
rows, err := ex.db.QueryFromObject(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
batchRows := make([]models.UnknownRowValues, 0, batchSize)
|
||||||
|
var rowsRead int64 = 0
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
rowValues := make([]any, len(columns))
|
||||||
|
scanArgs := make([]any, len(columns))
|
||||||
|
|
||||||
|
for i := range rowValues {
|
||||||
|
scanArgs[i] = &rowValues[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Scan(scanArgs...); err != nil {
|
||||||
|
if len(batchRows) == 0 {
|
||||||
|
return rowsRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
|
||||||
|
return rowsRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lastRow := batchRows[len(batchRows)-1]
|
||||||
|
return rowsRead, errorFromLastPartitionRow(lastRow, indexPrimaryKey, partition, err)
|
||||||
|
}
|
||||||
|
rowsRead++
|
||||||
|
|
||||||
|
batchRows = append(batchRows, rowValues)
|
||||||
|
if len(batchRows) >= batchSize {
|
||||||
|
// logrus.Debugf("Batch size reached, flushing batch with %v rows (rowsRead=%v)", len(batchRows), rowsRead)
|
||||||
|
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
|
||||||
|
// logrus.Warnf("Error flushing rows: %v", err)
|
||||||
|
return rowsRead, err
|
||||||
|
}
|
||||||
|
batchRows = make([]models.UnknownRowValues, 0, batchSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := flush(ctx, &partition, batchSize, batchRows, chBatchesOut); err != nil {
|
||||||
|
return rowsRead, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rowsRead, rows.Err()
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func PartitionRangeGenerator(
|
func PartitionRangeGenerator(
|
||||||
@@ -32,6 +33,7 @@ func PartitionRangeGenerator(
|
|||||||
}
|
}
|
||||||
|
|
||||||
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
|
rowsCount, err := tableAnalyzer.EstimateTotalRows(ctx, tableInfo)
|
||||||
|
logrus.Infof("Estimated rows in source: %v (%s.%s)", rowsCount, tableInfo.Schema, tableInfo.Table)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -51,5 +53,7 @@ func PartitionRangeGenerator(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// logrus.Debugf("Partitions: %+v (%s.%s)", partitions, tableInfo.Schema, tableInfo.Table)
|
||||||
|
|
||||||
return partitions, nil
|
return partitions, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -234,6 +234,7 @@ ORDER BY batch_id`,
|
|||||||
RetryCounter: 0,
|
RetryCounter: 0,
|
||||||
Range: models.PartitionRange{
|
Range: models.PartitionRange{
|
||||||
IsMinInclusive: true,
|
IsMinInclusive: true,
|
||||||
|
IsMaxInclusive: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Extractor interface {
|
type Extractor interface {
|
||||||
Exec(
|
ProcessPartition(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tableInfo config.SourceTableInfo,
|
tableInfo config.SourceTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
|
|||||||
Reference in New Issue
Block a user