Compare commits
12 Commits
c4e233401b
...
0c59d06af6
| Author | SHA1 | Date | |
|---|---|---|---|
|
0c59d06af6
|
|||
|
6fa9b21b1c
|
|||
|
a8be31c18b
|
|||
|
5a8bce7701
|
|||
|
b690e580c5
|
|||
|
68d983ea57
|
|||
|
1bc7b67643
|
|||
|
d124da8b20
|
|||
|
b5fd6d0534
|
|||
|
85d7d69da9
|
|||
|
d54108d5e5
|
|||
|
212d3663e2
|
@@ -86,7 +86,7 @@ func main() {
|
||||
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
|
||||
|
||||
totalDuration := time.Since(startTime)
|
||||
log.Infof("=== Migration completed successfully! ===")
|
||||
// log.Infof("=== Migration completed successfully! ===")
|
||||
log.Infof("Total migration time: %v", totalDuration)
|
||||
}
|
||||
|
||||
|
||||
@@ -57,8 +57,6 @@ func processMigrationJob(
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
|
||||
var rowsRead, rowsLoaded, rowsFailed int64
|
||||
|
||||
var wgQueryColumnTypes errgroup.Group
|
||||
var sourceColTypes, targetColTypes []models.ColumnType
|
||||
|
||||
@@ -114,16 +112,13 @@ func processMigrationJob(
|
||||
}
|
||||
|
||||
chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize)
|
||||
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
|
||||
chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
|
||||
chPartitions := make(chan models.Partition)
|
||||
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
|
||||
chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
|
||||
|
||||
var wgActivePartitions sync.WaitGroup
|
||||
var wgActiveBatches sync.WaitGroup
|
||||
var wgExtractors sync.WaitGroup
|
||||
var wgTransformers sync.WaitGroup
|
||||
var wgLoaders sync.WaitGroup
|
||||
var wgActivePartitions, wgActiveBatches, wgExtractors, wgTransformers, wgLoaders sync.WaitGroup
|
||||
var rowsRead, rowsLoaded, rowsFailed int64
|
||||
var failedPartitionsCount, failedBatchesLoadCount int32
|
||||
|
||||
go func() {
|
||||
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
|
||||
@@ -133,18 +128,8 @@ func processMigrationJob(
|
||||
}
|
||||
}()
|
||||
|
||||
go custom_errors.LoaderErrorHandler(
|
||||
localCtx,
|
||||
job.Retry,
|
||||
job.MaxExtractorBatchErrors,
|
||||
chLoadersErrors,
|
||||
chBatchesTransformed,
|
||||
chJobErrors,
|
||||
&wgActiveBatches,
|
||||
)
|
||||
|
||||
maxExtractors := min(job.MaxExtractors, len(partitions))
|
||||
log.Infof("Starting %d extractor(s)...", maxExtractors)
|
||||
log.Infof("Starting %d extractor(s)... (%v)", maxExtractors, job.Name)
|
||||
|
||||
for range maxExtractors {
|
||||
wgExtractors.Go(func() {
|
||||
@@ -159,6 +144,7 @@ func processMigrationJob(
|
||||
chJobErrors,
|
||||
&wgActivePartitions,
|
||||
&rowsRead,
|
||||
&failedPartitionsCount,
|
||||
job.SourceTable.FromJsonColumns,
|
||||
)
|
||||
})
|
||||
@@ -171,13 +157,15 @@ func processMigrationJob(
|
||||
}
|
||||
}()
|
||||
|
||||
log.Infof("Starting %d transformer(s)...", maxExtractors)
|
||||
log.Infof("Starting %d transformer(s)... (%v)", maxExtractors, job.Name)
|
||||
|
||||
for range maxExtractors {
|
||||
wgTransformers.Go(func() {
|
||||
transformer.Exec(
|
||||
transformer.Consume(
|
||||
localCtx,
|
||||
sourceColTypes,
|
||||
job.Retry,
|
||||
job.TransformerBatchSize,
|
||||
chBatchesRaw,
|
||||
chBatchesTransformed,
|
||||
chJobErrors,
|
||||
@@ -186,7 +174,7 @@ func processMigrationJob(
|
||||
})
|
||||
}
|
||||
|
||||
log.Infof("Starting %d loader(s)...", job.MaxLoaders)
|
||||
log.Infof("Starting %d loader(s)... (%v)", job.MaxLoaders, job.Name)
|
||||
|
||||
for range job.MaxLoaders {
|
||||
wgLoaders.Go(func() {
|
||||
@@ -195,39 +183,39 @@ func processMigrationJob(
|
||||
job.TargetTable,
|
||||
targetColTypes,
|
||||
job.Retry,
|
||||
job.LoaderBatchSize,
|
||||
chBatchesTransformed,
|
||||
chJobErrors,
|
||||
&wgActiveBatches,
|
||||
&rowsLoaded,
|
||||
&failedBatchesLoadCount,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
go func() {
|
||||
log.Debugf("Waiting for goroutines (%v)", job.Name)
|
||||
// log.Debugf("Waiting for goroutines (%v)", job.Name)
|
||||
|
||||
wgActivePartitions.Wait()
|
||||
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
|
||||
// log.Debugf("wgActivePartitions is empty (%v)", job.Name)
|
||||
close(chPartitions)
|
||||
log.Debugf("chPartitions is closed (%v)", job.Name)
|
||||
// log.Debugf("chPartitions is closed (%v)", job.Name)
|
||||
|
||||
wgExtractors.Wait()
|
||||
log.Debugf("wgExtractors is empty (%v)", job.Name)
|
||||
// log.Debugf("wgExtractors is empty (%v)", job.Name)
|
||||
close(chBatchesRaw)
|
||||
log.Debugf("chBatchesRaw is closed (%v)", job.Name)
|
||||
// log.Debugf("chBatchesRaw is closed (%v)", job.Name)
|
||||
|
||||
wgTransformers.Wait()
|
||||
log.Debugf("wgTransformers is empty (%v)", job.Name)
|
||||
// log.Debugf("wgTransformers is empty (%v)", job.Name)
|
||||
close(chBatchesTransformed)
|
||||
// log.Debugf("chBatchesTransformed is closed (%v)", job.Name)
|
||||
|
||||
wgActiveBatches.Wait()
|
||||
log.Debugf("wgActiveBatches is empty (%v)", job.Name)
|
||||
close(chBatchesTransformed)
|
||||
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
|
||||
close(chLoadersErrors)
|
||||
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
|
||||
// log.Debugf("wgActiveBatches is empty (%v)", job.Name)
|
||||
|
||||
wgLoaders.Wait()
|
||||
log.Debugf("wgLoaders is empty (%v)", job.Name)
|
||||
// log.Debugf("wgLoaders is empty (%v)", job.Name)
|
||||
|
||||
cancel()
|
||||
}()
|
||||
@@ -239,9 +227,9 @@ func processMigrationJob(
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("waiting for local context to be done (%v)", job.Name)
|
||||
// log.Debugf("waiting for local context to be done (%v)", job.Name)
|
||||
<-localCtx.Done()
|
||||
log.Debugf("local context done (%v)", job.Name)
|
||||
// log.Debugf("local context done (%v)", job.Name)
|
||||
|
||||
if ctx.Err() != nil {
|
||||
result.Error = ctx.Err()
|
||||
@@ -256,5 +244,9 @@ func processMigrationJob(
|
||||
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
|
||||
}
|
||||
|
||||
if result.RowsRead == 0 {
|
||||
log.Warnf("No rows extracted from (%v)", job.Name)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
21
config.yaml
21
config.yaml
@@ -3,24 +3,24 @@ source_db_type: sqlserver
|
||||
target_db_type: postgres
|
||||
|
||||
defaults:
|
||||
batches_per_partition: 8
|
||||
batches_per_partition: 4
|
||||
max_extractors: 2
|
||||
extractor_batch_size: 25000
|
||||
extractor_batch_size: 5000
|
||||
extractor_queue_size: 8
|
||||
max_transformers: 2
|
||||
transformer_batch_size: 25000
|
||||
transformer_batch_size: 12500
|
||||
transformer_queue_size: 8
|
||||
max_loaders: 4
|
||||
loader_batch_size: 25000
|
||||
truncate_target: true
|
||||
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
||||
max_partition_errrors: 5
|
||||
max_extractor_batch_errors: 5
|
||||
retry:
|
||||
attempts: 3
|
||||
base_delay_ms: 500
|
||||
max_delay_ms: 10000
|
||||
max_jitter_ms: 500
|
||||
max_failed_partitions: 5
|
||||
max_failed_batches_load: 5
|
||||
|
||||
jobs:
|
||||
- name: cartografia_manzana
|
||||
@@ -70,10 +70,15 @@ jobs:
|
||||
- source: DATA
|
||||
target: FILE_URL
|
||||
mode: REFERENCE_ONLY
|
||||
max_extractors: 8
|
||||
batches_per_partition: 20
|
||||
max_extractors: 32
|
||||
extractor_batch_size: 1
|
||||
extractor_queue_size: 100
|
||||
max_transformers: 48
|
||||
transformer_batch_size: 500
|
||||
transformer_queue_size: 8
|
||||
max_loaders: 4
|
||||
queue_size: 32
|
||||
batch_size: 1
|
||||
loader_batch_size: 500
|
||||
retry:
|
||||
attempts: 5
|
||||
base_delay_ms: 1000
|
||||
|
||||
@@ -12,6 +12,8 @@ type RetryConfig struct {
|
||||
BaseDelayMs int `yaml:"base_delay_ms"`
|
||||
MaxDelayMs int `yaml:"max_delay_ms"`
|
||||
MaxJitterMs int `yaml:"max_jitter_ms"`
|
||||
MaxFailedPartitions int `yaml:"max_failed_partitions"`
|
||||
MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"`
|
||||
}
|
||||
|
||||
type ToStorageColumnConfig struct {
|
||||
@@ -36,8 +38,6 @@ type JobConfig struct {
|
||||
LoaderBatchSize int `yaml:"loader_batch_size"`
|
||||
TruncateTarget bool `yaml:"truncate_target"`
|
||||
TruncateMethod string `yaml:"truncate_method"`
|
||||
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
|
||||
MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"`
|
||||
Retry RetryConfig `yaml:"retry"`
|
||||
RowsPerPartition int64
|
||||
ToStorage ToStorageConfig `yaml:"to_storage"`
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package custom_errors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
@@ -40,22 +39,3 @@ func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJ
|
||||
|
||||
return delay
|
||||
}
|
||||
|
||||
func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) {
|
||||
if delay <= 0 {
|
||||
enqueue()
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
timer := time.NewTimer(delay)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
enqueue()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
package custom_errors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
type LoaderError struct {
|
||||
Batch models.Batch
|
||||
Msg string
|
||||
}
|
||||
|
||||
func (e *LoaderError) Error() string {
|
||||
return e.Msg
|
||||
}
|
||||
|
||||
func LoaderErrorHandler(
|
||||
ctx context.Context,
|
||||
retryConfig config.RetryConfig,
|
||||
maxChunkErrors int,
|
||||
chErrorsIn <-chan LoaderError,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chJobErrorsOut chan<- JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
) {
|
||||
definitiveErrors := 0
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case err, ok := <-chErrorsIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if err.Batch.RetryCounter >= retryConfig.Attempts {
|
||||
wgActiveBatches.Done()
|
||||
definitiveErrors++
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
|
||||
Prev: &err,
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- jobError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
|
||||
fatalError := JobError{
|
||||
ShouldCancelJob: true,
|
||||
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
|
||||
Prev: &err,
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- fatalError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
} else {
|
||||
jobError := JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter),
|
||||
Prev: &err,
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- jobError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err.Batch.RetryCounter++
|
||||
delay := ComputeBackoffDelay(
|
||||
err.Batch.RetryCounter,
|
||||
retryConfig.BaseDelayMs,
|
||||
retryConfig.MaxDelayMs,
|
||||
retryConfig.MaxJitterMs,
|
||||
)
|
||||
|
||||
requeueWithBackoff(ctx, delay, func() {
|
||||
select {
|
||||
case chBatchesOut <- err.Batch:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,3 +14,12 @@ type ExtractorError struct {
|
||||
func (e *ExtractorError) Error() string {
|
||||
return e.Msg
|
||||
}
|
||||
|
||||
type LoaderError struct {
|
||||
Batch models.Batch
|
||||
Msg string
|
||||
}
|
||||
|
||||
func (e *LoaderError) Error() string {
|
||||
return e.Msg
|
||||
}
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
mssql "github.com/microsoft/go-mssqldb"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -188,8 +187,6 @@ func buildExtractQueryMssql(q ExtractionQuery) (string, error) {
|
||||
hasRegularColumns := len(q.Columns) > 0
|
||||
hasJsonColumns := len(q.FromJsonColumns) > 0
|
||||
|
||||
// logrus.Debugf("Extraction query: %+v", q)
|
||||
|
||||
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
|
||||
if hasJsonColumns {
|
||||
for _, jsonConfig := range q.FromJsonColumns {
|
||||
@@ -296,7 +293,7 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logrus.Debugf("Query: %s", queryString)
|
||||
// logrus.Debugf("Query: %s", queryString)
|
||||
|
||||
var queryArgs []any
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume(
|
||||
chErrorsOut chan<- custom_errors.JobError,
|
||||
wgActivePartitions *sync.WaitGroup,
|
||||
rowsRead *int64,
|
||||
failedPartitionsCount *int32,
|
||||
fromJsonColumns []config.FromJsonItem,
|
||||
) {
|
||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||
@@ -72,11 +73,12 @@ func (ex *GenericExtractor) Consume(
|
||||
|
||||
if rowsReadResult > 0 {
|
||||
current := atomic.LoadInt64(rowsRead)
|
||||
logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
|
||||
logrus.Debugf("Rows read (partition extracted): +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
|
||||
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
atomic.AddInt32(failedPartitionsCount, 1)
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -90,6 +92,16 @@ func (ex *GenericExtractor) Consume(
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||
}
|
||||
}
|
||||
|
||||
currentFPCount := atomic.LoadInt32(failedPartitionsCount)
|
||||
if currentFPCount > int32(retryConfig.MaxFailedPartitions) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed partitions reached"}:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
|
||||
|
||||
if currentParitition.RetryCounter >= retryConfig.Attempts {
|
||||
return totalRowsRead, &custom_errors.JobError{
|
||||
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
|
||||
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", currentParitition.Id, currentParitition.RetryCounter),
|
||||
Prev: err,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (gl *GenericLoader) Consume(
|
||||
@@ -16,15 +18,77 @@ func (gl *GenericLoader) Consume(
|
||||
tableInfo config.TargetTableInfo,
|
||||
columns []models.ColumnType,
|
||||
retryConfig config.RetryConfig,
|
||||
batchSize int,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
rowsLoaded *int64,
|
||||
failedBatchesCount *int32,
|
||||
) {
|
||||
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
||||
return col.Name()
|
||||
})
|
||||
|
||||
var accRows []models.UnknownRowValues
|
||||
var parentBatchesId []uuid.UUID
|
||||
pendingDone := 0
|
||||
|
||||
defer func() {
|
||||
for range pendingDone {
|
||||
wgActiveBatches.Done()
|
||||
}
|
||||
}()
|
||||
|
||||
flush := func() bool {
|
||||
if len(accRows) == 0 {
|
||||
return true
|
||||
}
|
||||
count := len(parentBatchesId)
|
||||
superBatch := models.Batch{
|
||||
Id: uuid.New(),
|
||||
ParentBatchesId: parentBatchesId,
|
||||
Rows: accRows,
|
||||
}
|
||||
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, superBatch)
|
||||
for range count {
|
||||
wgActiveBatches.Done()
|
||||
}
|
||||
pendingDone -= count
|
||||
accRows = nil
|
||||
parentBatchesId = nil
|
||||
|
||||
if err != nil {
|
||||
atomic.AddInt32(failedBatchesCount, 1)
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case chErrorsOut <- *jobError:
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||
}
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
current := atomic.LoadInt64(rowsLoaded)
|
||||
logrus.Debugf("Rows loaded (batch loaded): +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
|
||||
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||
return true
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
@@ -35,13 +99,16 @@ func (gl *GenericLoader) Consume(
|
||||
return
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
flush()
|
||||
return
|
||||
}
|
||||
|
||||
if batchSize <= 0 {
|
||||
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
|
||||
wgActiveBatches.Done()
|
||||
|
||||
if err != nil {
|
||||
atomic.AddInt32(failedBatchesCount, 1)
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -55,8 +122,32 @@ func (gl *GenericLoader) Consume(
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
|
||||
return
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
current := atomic.LoadInt64(rowsLoaded)
|
||||
logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
|
||||
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||
continue
|
||||
}
|
||||
|
||||
pendingDone++
|
||||
accRows = append(accRows, batch.Rows...)
|
||||
parentBatchesId = append(parentBatchesId, batch.Id)
|
||||
|
||||
if len(accRows) >= batchSize {
|
||||
if !flush() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,8 +29,7 @@ func (gl *GenericLoader) ProcessBatchWithRetries(
|
||||
|
||||
if batch.RetryCounter >= retryConfig.Attempts {
|
||||
return rowsLoaded, &custom_errors.JobError{
|
||||
ShouldCancelJob: false,
|
||||
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", btError.Batch.Id, btError.Batch.RetryCounter),
|
||||
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", batch.Id, batch.RetryCounter),
|
||||
Prev: btError,
|
||||
}
|
||||
}
|
||||
|
||||
117
internal/app/etl/transformers/consume.go
Normal file
117
internal/app/etl/transformers/consume.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package transformers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func (mssqlTr *MssqlTransformer) Consume(
|
||||
ctx context.Context,
|
||||
columns []models.ColumnType,
|
||||
retryConfig config.RetryConfig,
|
||||
batchSize int,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
) {
|
||||
transformationPlan := computeTransformationPlan(columns)
|
||||
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
|
||||
transformationPlan = append(transformationPlan, storagePlan...)
|
||||
|
||||
var accRows []models.UnknownRowValues
|
||||
var parentBatchesId []uuid.UUID
|
||||
var firstPartitionId uuid.UUID
|
||||
|
||||
flush := func() bool {
|
||||
if len(accRows) == 0 {
|
||||
return true
|
||||
}
|
||||
out := models.Batch{
|
||||
Id: uuid.New(),
|
||||
PartitionId: firstPartitionId,
|
||||
ParentBatchesId: parentBatchesId,
|
||||
Rows: accRows,
|
||||
}
|
||||
select {
|
||||
case chBatchesOut <- out:
|
||||
wgActiveBatches.Add(1)
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
accRows = nil
|
||||
parentBatchesId = nil
|
||||
firstPartitionId = uuid.Nil
|
||||
return true
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
flush()
|
||||
return
|
||||
}
|
||||
|
||||
if len(transformationPlan) > 0 {
|
||||
err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig)
|
||||
if err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return
|
||||
}
|
||||
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
select {
|
||||
case chJobErrorsOut <- *jobError:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if batchSize <= 0 {
|
||||
select {
|
||||
case chBatchesOut <- batch:
|
||||
wgActiveBatches.Add(1)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if len(parentBatchesId) == 0 {
|
||||
firstPartitionId = batch.PartitionId
|
||||
}
|
||||
accRows = append(accRows, batch.Rows...)
|
||||
parentBatchesId = append(parentBatchesId, batch.Id)
|
||||
|
||||
if len(accRows) >= batchSize {
|
||||
if !flush() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,20 +1,9 @@
|
||||
package transformers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type MssqlTransformer struct {
|
||||
@@ -30,196 +19,3 @@ func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.So
|
||||
azureClient: azureClient,
|
||||
}
|
||||
}
|
||||
|
||||
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
||||
var plan []etl.ColumnTransformPlan
|
||||
|
||||
for i, col := range columns {
|
||||
switch col.SystemType() {
|
||||
case "uniqueidentifier":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return mssqlUuidToBigEndian(b)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "geometry", "geography":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return wkbToEwkbWithSrid(b, 4326)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "datetime", "datetime2":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if t, ok := v.(time.Time); ok {
|
||||
return ensureUTC(t), nil
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return plan
|
||||
}
|
||||
|
||||
func computeStorageTransformationPlan(
|
||||
ctx context.Context,
|
||||
azureClient *azure.Client,
|
||||
toStorage config.ToStorageConfig,
|
||||
sourceColumns []models.ColumnType,
|
||||
sourceTable config.SourceTableInfo,
|
||||
) []etl.ColumnTransformPlan {
|
||||
if azureClient == nil || len(toStorage.Columns) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
colIndex := make(map[string]int, len(sourceColumns))
|
||||
for i, col := range sourceColumns {
|
||||
colIndex[strings.ToUpper(col.Name())] = i
|
||||
}
|
||||
|
||||
var plan []etl.ColumnTransformPlan
|
||||
for _, storageCol := range toStorage.Columns {
|
||||
if storageCol.Mode != "REFERENCE_ONLY" {
|
||||
log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
|
||||
if !ok {
|
||||
log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
sourceColName := storageCol.Source
|
||||
schema := sourceTable.Schema
|
||||
table := sourceTable.Table
|
||||
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: idx,
|
||||
Fn: func(v any) (any, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
b, ok := v.([]byte)
|
||||
if !ok {
|
||||
log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
|
||||
schema, table, sourceColName, v)
|
||||
return v, nil
|
||||
}
|
||||
start := time.Now()
|
||||
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
|
||||
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err)
|
||||
}
|
||||
log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
|
||||
return blobURL, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
return plan
|
||||
}
|
||||
|
||||
const processBatchCtxCheck = 4096
|
||||
|
||||
func (mssqlTr *MssqlTransformer) ProcessBatch(
|
||||
ctx context.Context,
|
||||
batch *models.Batch,
|
||||
transformationPlan []etl.ColumnTransformPlan,
|
||||
) error {
|
||||
for i, rowValues := range batch.Rows {
|
||||
if i%processBatchCtxCheck == 0 {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, task := range transformationPlan {
|
||||
val := rowValues[task.Index]
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
transformed, err := task.Fn(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rowValues[task.Index] = transformed
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mssqlTr *MssqlTransformer) Exec(
|
||||
ctx context.Context,
|
||||
columns []models.ColumnType,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
) {
|
||||
transformationPlan := computeTransformationPlan(columns)
|
||||
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
|
||||
transformationPlan = append(transformationPlan, storagePlan...)
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case batch, ok := <-chBatchesIn:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if len(transformationPlan) == 0 {
|
||||
select {
|
||||
case chBatchesOut <- batch:
|
||||
wgActiveBatches.Add(1)
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
|
||||
if err != nil {
|
||||
if errors.Is(err, ctx.Err()) {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case chBatchesOut <- batch:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
wgActiveBatches.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
122
internal/app/etl/transformers/plan.go
Normal file
122
internal/app/etl/transformers/plan.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package transformers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
"github.com/google/uuid"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
||||
var plan []etl.ColumnTransformPlan
|
||||
|
||||
for i, col := range columns {
|
||||
switch col.SystemType() {
|
||||
case "uniqueidentifier":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return mssqlUuidToBigEndian(b)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "geometry", "geography":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if b, ok := v.([]byte); ok && b != nil {
|
||||
return wkbToEwkbWithSrid(b, 4326)
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
|
||||
case "datetime", "datetime2":
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: i,
|
||||
Fn: func(v any) (any, error) {
|
||||
if t, ok := v.(time.Time); ok {
|
||||
return ensureUTC(t), nil
|
||||
}
|
||||
return v, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return plan
|
||||
}
|
||||
|
||||
func computeStorageTransformationPlan(
|
||||
ctx context.Context,
|
||||
azureClient *azure.Client,
|
||||
toStorage config.ToStorageConfig,
|
||||
sourceColumns []models.ColumnType,
|
||||
sourceTable config.SourceTableInfo,
|
||||
) []etl.ColumnTransformPlan {
|
||||
if azureClient == nil || len(toStorage.Columns) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
colIndex := make(map[string]int, len(sourceColumns))
|
||||
for i, col := range sourceColumns {
|
||||
colIndex[strings.ToUpper(col.Name())] = i
|
||||
}
|
||||
|
||||
var plan []etl.ColumnTransformPlan
|
||||
for _, storageCol := range toStorage.Columns {
|
||||
if storageCol.Mode != "REFERENCE_ONLY" {
|
||||
logrus.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
|
||||
if !ok {
|
||||
logrus.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
|
||||
continue
|
||||
}
|
||||
|
||||
sourceColName := storageCol.Source
|
||||
schema := sourceTable.Schema
|
||||
table := sourceTable.Table
|
||||
|
||||
plan = append(plan, etl.ColumnTransformPlan{
|
||||
Index: idx,
|
||||
Fn: func(v any) (any, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
b, ok := v.([]byte)
|
||||
if !ok {
|
||||
logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
|
||||
schema, table, sourceColName, v)
|
||||
return v, nil
|
||||
}
|
||||
// start := time.Now()
|
||||
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
|
||||
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
|
||||
if err != nil {
|
||||
return nil, &custom_errors.JobError{
|
||||
Msg: fmt.Sprintf("Error uploading %s.%s.%s", schema, table, sourceColName),
|
||||
Prev: err,
|
||||
}
|
||||
}
|
||||
|
||||
// logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
|
||||
return blobURL, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
return plan
|
||||
}
|
||||
73
internal/app/etl/transformers/process-with-retries.go
Normal file
73
internal/app/etl/transformers/process-with-retries.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package transformers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
const processBatchCtxCheck = 4096
|
||||
|
||||
func ProcessBatchWithRetries(
|
||||
ctx context.Context,
|
||||
batch *models.Batch,
|
||||
transformationPlan []etl.ColumnTransformPlan,
|
||||
retryConfig config.RetryConfig,
|
||||
) error {
|
||||
for i, rowValues := range batch.Rows {
|
||||
if i%processBatchCtxCheck == 0 {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, task := range transformationPlan {
|
||||
val := rowValues[task.Index]
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
success := false
|
||||
|
||||
for attempt := 0; attempt < retryConfig.Attempts; attempt++ {
|
||||
transformed, err := task.Fn(val)
|
||||
if err == nil {
|
||||
rowValues[task.Index] = transformed
|
||||
success = true
|
||||
break
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||
if jobError.ShouldCancelJob {
|
||||
return jobError
|
||||
}
|
||||
}
|
||||
|
||||
if attempt == retryConfig.Attempts-1 {
|
||||
break
|
||||
}
|
||||
|
||||
delay := custom_errors.ComputeBackoffDelay(
|
||||
attempt,
|
||||
retryConfig.BaseDelayMs,
|
||||
retryConfig.MaxDelayMs,
|
||||
retryConfig.MaxJitterMs,
|
||||
)
|
||||
time.Sleep(delay)
|
||||
}
|
||||
|
||||
if !success {
|
||||
return lastErr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
package transformers
|
||||
@@ -9,18 +9,6 @@ import (
|
||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||
)
|
||||
|
||||
type Extractor interface {
|
||||
ProcessPartition(
|
||||
ctx context.Context,
|
||||
tableInfo config.SourceTableInfo,
|
||||
columns []models.ColumnType,
|
||||
batchSize int,
|
||||
partition models.Partition,
|
||||
indexPrimaryKey int,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
) (int, error)
|
||||
}
|
||||
|
||||
type TransformerFunc func(any) (any, error)
|
||||
|
||||
type ColumnTransformPlan struct {
|
||||
@@ -29,31 +17,18 @@ type ColumnTransformPlan struct {
|
||||
}
|
||||
|
||||
type Transformer interface {
|
||||
ProcessBatch(
|
||||
ctx context.Context,
|
||||
batch *models.Batch,
|
||||
transformationPlan []ColumnTransformPlan,
|
||||
) error
|
||||
|
||||
Exec(
|
||||
Consume(
|
||||
ctx context.Context,
|
||||
columns []models.ColumnType,
|
||||
retryConfig config.RetryConfig,
|
||||
batchSize int,
|
||||
chBatchesIn <-chan models.Batch,
|
||||
chBactchesOut chan<- models.Batch,
|
||||
chBatchesOut chan<- models.Batch,
|
||||
chJobErrorsOut chan<- custom_errors.JobError,
|
||||
wgActiveBatches *sync.WaitGroup,
|
||||
)
|
||||
}
|
||||
|
||||
type Loader interface {
|
||||
ProcessBatch(
|
||||
ctx context.Context,
|
||||
tableInfo config.TargetTableInfo,
|
||||
colNames []string,
|
||||
batch models.Batch,
|
||||
) (int64, error)
|
||||
}
|
||||
|
||||
type TableAnalyzer interface {
|
||||
QueryColumnTypes(
|
||||
ctx context.Context,
|
||||
|
||||
@@ -11,6 +11,7 @@ type UnknownRowValues = []any
|
||||
type Batch struct {
|
||||
Id uuid.UUID
|
||||
PartitionId uuid.UUID
|
||||
ParentBatchesId []uuid.UUID
|
||||
Rows []UnknownRowValues
|
||||
RetryCounter int
|
||||
}
|
||||
|
||||
@@ -12,8 +12,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
totalRows int = 1_000_000
|
||||
chunkSize int = 50_000
|
||||
// totalRows int = 1_000_000
|
||||
totalRows int = 1000
|
||||
chunkSize int = 200
|
||||
queueSize int = 4
|
||||
)
|
||||
|
||||
@@ -40,6 +41,14 @@ func main() {
|
||||
seedManzanas(ctx, db)
|
||||
})
|
||||
|
||||
wgSeed.Go(func() {
|
||||
seedPuertos(ctx, db)
|
||||
})
|
||||
|
||||
wgSeed.Go(func() {
|
||||
seedSiteHolderAttach(ctx, db)
|
||||
})
|
||||
|
||||
wgSeed.Wait()
|
||||
}
|
||||
|
||||
|
||||
227
scripts/mssql-copy-in/site-holder-attach.go
Normal file
227
scripts/mssql-copy-in/site-holder-attach.go
Normal file
@@ -0,0 +1,227 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var siteHolderAttachJob = MigrationJob{
|
||||
Schema: "Infraestructura",
|
||||
Table: "SITE_HOLDER__ATTACH",
|
||||
}
|
||||
|
||||
func seedSiteHolderAttach(ctx context.Context, db *sql.DB) error {
|
||||
maxOid, err := getMaxGDBArchiveOidForAttach(ctx, db)
|
||||
if err != nil {
|
||||
log.Fatal("Error getting max GDB_ARCHIVE_OID: ", err)
|
||||
}
|
||||
|
||||
log.Infof("Starting SITE_HOLDER__ATTACH data generation from GDB_ARCHIVE_OID: %d", maxOid+1)
|
||||
|
||||
rowsChan := make(chan []UnknownRowValues, queueSize)
|
||||
|
||||
var wgRowGenerator sync.WaitGroup
|
||||
|
||||
wgRowGenerator.Go(func() {
|
||||
generateSiteHolderAttachRows(ctx, maxOid, totalRows, chunkSize, rowsChan)
|
||||
})
|
||||
|
||||
columns := []string{
|
||||
"GDB_ARCHIVE_OID",
|
||||
"REL_GLOBALID",
|
||||
"CONTENT_TYPE",
|
||||
"ATT_NAME",
|
||||
"DATA_SIZE",
|
||||
"DATA",
|
||||
"GLOBALID",
|
||||
"GDB_FROM_DATE",
|
||||
"GDB_TO_DATE",
|
||||
"ATTACHMENTID",
|
||||
}
|
||||
|
||||
if err := loadRowsMssql(ctx, siteHolderAttachJob, columns, db, rowsChan); err != nil {
|
||||
return fmt.Errorf("Error loading rows (SITE_HOLDER__ATTACH): %w", err)
|
||||
}
|
||||
|
||||
log.Info("Data generation and loading completed successfully (SITE_HOLDER__ATTACH)")
|
||||
wgRowGenerator.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMaxGDBArchiveOidForAttach(ctx context.Context, db *sql.DB) (int, error) {
|
||||
var maxOid sql.NullInt64
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0)
|
||||
FROM [%s].[%s]
|
||||
`, siteHolderAttachJob.Schema, siteHolderAttachJob.Table)
|
||||
|
||||
err := db.QueryRowContext(ctx, query).Scan(&maxOid)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if !maxOid.Valid {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return int(maxOid.Int64), nil
|
||||
}
|
||||
|
||||
func generateSiteHolderAttachRows(
|
||||
ctx context.Context,
|
||||
startOid int,
|
||||
totalRows int,
|
||||
chunkSize int,
|
||||
out chan<- []UnknownRowValues,
|
||||
) {
|
||||
defer close(out)
|
||||
|
||||
rowsGenerated := 0
|
||||
currentChunk := make([]UnknownRowValues, 0, chunkSize)
|
||||
|
||||
for i := range totalRows {
|
||||
gdbArchiveOid := startOid + i + 1
|
||||
row := generateSiteHolderAttachRow(gdbArchiveOid)
|
||||
currentChunk = append(currentChunk, row)
|
||||
rowsGenerated++
|
||||
|
||||
if len(currentChunk) == chunkSize {
|
||||
select {
|
||||
case out <- currentChunk:
|
||||
log.Debugf("Sent SITE_HOLDER__ATTACH chunk with %d rows", len(currentChunk))
|
||||
case <-ctx.Done():
|
||||
log.Info("Context cancelled, stopping SITE_HOLDER__ATTACH row generation")
|
||||
return
|
||||
}
|
||||
currentChunk = make([]UnknownRowValues, 0, chunkSize)
|
||||
}
|
||||
|
||||
if rowsGenerated%100_000 == 0 {
|
||||
logSiteHolderAttachSampleRow(rowsGenerated, row)
|
||||
}
|
||||
}
|
||||
|
||||
if len(currentChunk) > 0 {
|
||||
select {
|
||||
case out <- currentChunk:
|
||||
log.Debugf("Sent final SITE_HOLDER__ATTACH chunk with %d rows", len(currentChunk))
|
||||
case <-ctx.Done():
|
||||
log.Info("Context cancelled, stopping SITE_HOLDER__ATTACH row generation")
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Finished generating %d SITE_HOLDER__ATTACH rows", rowsGenerated)
|
||||
}
|
||||
|
||||
func generateSiteHolderAttachRow(gdbArchiveOid int) UnknownRowValues {
|
||||
dateLowerLimit, _ := time.Parse(time.RFC3339, "2020-12-31T23:59:59Z")
|
||||
dateUpperLimit, _ := time.Parse(time.RFC3339, "2025-12-31T23:59:59Z")
|
||||
|
||||
relGlobalID, _ := uuid.New().MarshalBinary()
|
||||
contentType := generateRandomContentType()
|
||||
attName := generateRandomAttachmentName()
|
||||
binaryData := generateRandomBinaryContent()
|
||||
dataSize := len(binaryData)
|
||||
globalID, _ := uuid.New().MarshalBinary()
|
||||
gdbFromDate := generateRandomTimestamp(dateLowerLimit, dateUpperLimit)
|
||||
gdbToDate, _ := time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
|
||||
attachmentID := rand.Intn(10000) + 1
|
||||
|
||||
return UnknownRowValues{
|
||||
gdbArchiveOid,
|
||||
relGlobalID,
|
||||
contentType,
|
||||
attName,
|
||||
dataSize,
|
||||
binaryData,
|
||||
globalID,
|
||||
gdbFromDate,
|
||||
gdbToDate,
|
||||
attachmentID,
|
||||
}
|
||||
}
|
||||
|
||||
func generateRandomContentType() string {
|
||||
contentTypes := []string{
|
||||
"text/plain",
|
||||
"application/pdf",
|
||||
"image/jpeg",
|
||||
"image/png",
|
||||
"application/msword",
|
||||
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
||||
"text/csv",
|
||||
"application/json",
|
||||
}
|
||||
return contentTypes[rand.Intn(len(contentTypes))]
|
||||
}
|
||||
|
||||
func generateRandomAttachmentName() string {
|
||||
extensions := []string{".txt", ".pdf", ".jpg", ".png", ".doc", ".docx", ".csv", ".json"}
|
||||
baseName := generateRandomString(20)
|
||||
extension := extensions[rand.Intn(len(extensions))]
|
||||
return baseName + extension
|
||||
}
|
||||
|
||||
func generateRandomBinaryContent() []byte {
|
||||
sizeOptions := []int{100, 500, 1000, 5000, 10000, 50000, 100000}
|
||||
size := sizeOptions[rand.Intn(len(sizeOptions))]
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
lineCount := rand.Intn(size/50) + 1
|
||||
for range lineCount {
|
||||
line := generateRandomString(rand.Intn(80) + 20)
|
||||
buf.WriteString(line)
|
||||
buf.WriteString("\n")
|
||||
}
|
||||
|
||||
for buf.Len() < size {
|
||||
randomText := generateRandomString(rand.Intn(100) + 50)
|
||||
buf.WriteString(randomText)
|
||||
buf.WriteString("\n")
|
||||
}
|
||||
|
||||
result := buf.Bytes()
|
||||
if len(result) > size {
|
||||
result = result[:size]
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func logSiteHolderAttachSampleRow(id int, rowValues UnknownRowValues) {
|
||||
dataBytes := rowValues[5].([]byte)
|
||||
log.Infof(`
|
||||
Sample SITE_HOLDER__ATTACH row #%d:
|
||||
GDB_ARCHIVE_OID: %v
|
||||
REL_GLOBALID: [binary UUID]
|
||||
CONTENT_TYPE: %v
|
||||
ATT_NAME: %v
|
||||
DATA_SIZE: %v
|
||||
DATA: [%d bytes of binary content]
|
||||
GLOBALID: [binary UUID]
|
||||
GDB_FROM_DATE: %v
|
||||
GDB_TO_DATE: %v
|
||||
ATTACHMENTID: %v
|
||||
`,
|
||||
id,
|
||||
rowValues[0],
|
||||
rowValues[2],
|
||||
rowValues[3],
|
||||
rowValues[4],
|
||||
len(dataBytes),
|
||||
rowValues[7],
|
||||
rowValues[8],
|
||||
rowValues[9],
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user