Compare commits
12 Commits
c4e233401b
...
0c59d06af6
| Author | SHA1 | Date | |
|---|---|---|---|
|
0c59d06af6
|
|||
|
6fa9b21b1c
|
|||
|
a8be31c18b
|
|||
|
5a8bce7701
|
|||
|
b690e580c5
|
|||
|
68d983ea57
|
|||
|
1bc7b67643
|
|||
|
d124da8b20
|
|||
|
b5fd6d0534
|
|||
|
85d7d69da9
|
|||
|
d54108d5e5
|
|||
|
212d3663e2
|
@@ -86,7 +86,7 @@ func main() {
|
|||||||
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
|
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
|
||||||
|
|
||||||
totalDuration := time.Since(startTime)
|
totalDuration := time.Since(startTime)
|
||||||
log.Infof("=== Migration completed successfully! ===")
|
// log.Infof("=== Migration completed successfully! ===")
|
||||||
log.Infof("Total migration time: %v", totalDuration)
|
log.Infof("Total migration time: %v", totalDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -57,8 +57,6 @@ func processMigrationJob(
|
|||||||
StartTime: time.Now(),
|
StartTime: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
var rowsRead, rowsLoaded, rowsFailed int64
|
|
||||||
|
|
||||||
var wgQueryColumnTypes errgroup.Group
|
var wgQueryColumnTypes errgroup.Group
|
||||||
var sourceColTypes, targetColTypes []models.ColumnType
|
var sourceColTypes, targetColTypes []models.ColumnType
|
||||||
|
|
||||||
@@ -114,16 +112,13 @@ func processMigrationJob(
|
|||||||
}
|
}
|
||||||
|
|
||||||
chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize)
|
chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize)
|
||||||
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize)
|
chPartitions := make(chan models.Partition)
|
||||||
chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
|
|
||||||
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
|
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
|
||||||
chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
|
chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
|
||||||
|
|
||||||
var wgActivePartitions sync.WaitGroup
|
var wgActivePartitions, wgActiveBatches, wgExtractors, wgTransformers, wgLoaders sync.WaitGroup
|
||||||
var wgActiveBatches sync.WaitGroup
|
var rowsRead, rowsLoaded, rowsFailed int64
|
||||||
var wgExtractors sync.WaitGroup
|
var failedPartitionsCount, failedBatchesLoadCount int32
|
||||||
var wgTransformers sync.WaitGroup
|
|
||||||
var wgLoaders sync.WaitGroup
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
|
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
|
||||||
@@ -133,18 +128,8 @@ func processMigrationJob(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go custom_errors.LoaderErrorHandler(
|
|
||||||
localCtx,
|
|
||||||
job.Retry,
|
|
||||||
job.MaxExtractorBatchErrors,
|
|
||||||
chLoadersErrors,
|
|
||||||
chBatchesTransformed,
|
|
||||||
chJobErrors,
|
|
||||||
&wgActiveBatches,
|
|
||||||
)
|
|
||||||
|
|
||||||
maxExtractors := min(job.MaxExtractors, len(partitions))
|
maxExtractors := min(job.MaxExtractors, len(partitions))
|
||||||
log.Infof("Starting %d extractor(s)...", maxExtractors)
|
log.Infof("Starting %d extractor(s)... (%v)", maxExtractors, job.Name)
|
||||||
|
|
||||||
for range maxExtractors {
|
for range maxExtractors {
|
||||||
wgExtractors.Go(func() {
|
wgExtractors.Go(func() {
|
||||||
@@ -159,6 +144,7 @@ func processMigrationJob(
|
|||||||
chJobErrors,
|
chJobErrors,
|
||||||
&wgActivePartitions,
|
&wgActivePartitions,
|
||||||
&rowsRead,
|
&rowsRead,
|
||||||
|
&failedPartitionsCount,
|
||||||
job.SourceTable.FromJsonColumns,
|
job.SourceTable.FromJsonColumns,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
@@ -171,13 +157,15 @@ func processMigrationJob(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
log.Infof("Starting %d transformer(s)...", maxExtractors)
|
log.Infof("Starting %d transformer(s)... (%v)", maxExtractors, job.Name)
|
||||||
|
|
||||||
for range maxExtractors {
|
for range maxExtractors {
|
||||||
wgTransformers.Go(func() {
|
wgTransformers.Go(func() {
|
||||||
transformer.Exec(
|
transformer.Consume(
|
||||||
localCtx,
|
localCtx,
|
||||||
sourceColTypes,
|
sourceColTypes,
|
||||||
|
job.Retry,
|
||||||
|
job.TransformerBatchSize,
|
||||||
chBatchesRaw,
|
chBatchesRaw,
|
||||||
chBatchesTransformed,
|
chBatchesTransformed,
|
||||||
chJobErrors,
|
chJobErrors,
|
||||||
@@ -186,7 +174,7 @@ func processMigrationJob(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Starting %d loader(s)...", job.MaxLoaders)
|
log.Infof("Starting %d loader(s)... (%v)", job.MaxLoaders, job.Name)
|
||||||
|
|
||||||
for range job.MaxLoaders {
|
for range job.MaxLoaders {
|
||||||
wgLoaders.Go(func() {
|
wgLoaders.Go(func() {
|
||||||
@@ -195,39 +183,39 @@ func processMigrationJob(
|
|||||||
job.TargetTable,
|
job.TargetTable,
|
||||||
targetColTypes,
|
targetColTypes,
|
||||||
job.Retry,
|
job.Retry,
|
||||||
|
job.LoaderBatchSize,
|
||||||
chBatchesTransformed,
|
chBatchesTransformed,
|
||||||
chJobErrors,
|
chJobErrors,
|
||||||
&wgActiveBatches,
|
&wgActiveBatches,
|
||||||
&rowsLoaded,
|
&rowsLoaded,
|
||||||
|
&failedBatchesLoadCount,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Debugf("Waiting for goroutines (%v)", job.Name)
|
// log.Debugf("Waiting for goroutines (%v)", job.Name)
|
||||||
|
|
||||||
wgActivePartitions.Wait()
|
wgActivePartitions.Wait()
|
||||||
log.Debugf("wgActivePartitions is empty (%v)", job.Name)
|
// log.Debugf("wgActivePartitions is empty (%v)", job.Name)
|
||||||
close(chPartitions)
|
close(chPartitions)
|
||||||
log.Debugf("chPartitions is closed (%v)", job.Name)
|
// log.Debugf("chPartitions is closed (%v)", job.Name)
|
||||||
|
|
||||||
wgExtractors.Wait()
|
wgExtractors.Wait()
|
||||||
log.Debugf("wgExtractors is empty (%v)", job.Name)
|
// log.Debugf("wgExtractors is empty (%v)", job.Name)
|
||||||
close(chBatchesRaw)
|
close(chBatchesRaw)
|
||||||
log.Debugf("chBatchesRaw is closed (%v)", job.Name)
|
// log.Debugf("chBatchesRaw is closed (%v)", job.Name)
|
||||||
|
|
||||||
wgTransformers.Wait()
|
wgTransformers.Wait()
|
||||||
log.Debugf("wgTransformers is empty (%v)", job.Name)
|
// log.Debugf("wgTransformers is empty (%v)", job.Name)
|
||||||
|
close(chBatchesTransformed)
|
||||||
|
// log.Debugf("chBatchesTransformed is closed (%v)", job.Name)
|
||||||
|
|
||||||
wgActiveBatches.Wait()
|
wgActiveBatches.Wait()
|
||||||
log.Debugf("wgActiveBatches is empty (%v)", job.Name)
|
// log.Debugf("wgActiveBatches is empty (%v)", job.Name)
|
||||||
close(chBatchesTransformed)
|
|
||||||
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
|
|
||||||
close(chLoadersErrors)
|
|
||||||
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
|
|
||||||
|
|
||||||
wgLoaders.Wait()
|
wgLoaders.Wait()
|
||||||
log.Debugf("wgLoaders is empty (%v)", job.Name)
|
// log.Debugf("wgLoaders is empty (%v)", job.Name)
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
@@ -239,9 +227,9 @@ func processMigrationJob(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("waiting for local context to be done (%v)", job.Name)
|
// log.Debugf("waiting for local context to be done (%v)", job.Name)
|
||||||
<-localCtx.Done()
|
<-localCtx.Done()
|
||||||
log.Debugf("local context done (%v)", job.Name)
|
// log.Debugf("local context done (%v)", job.Name)
|
||||||
|
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
result.Error = ctx.Err()
|
result.Error = ctx.Err()
|
||||||
@@ -256,5 +244,9 @@ func processMigrationJob(
|
|||||||
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
|
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if result.RowsRead == 0 {
|
||||||
|
log.Warnf("No rows extracted from (%v)", job.Name)
|
||||||
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|||||||
21
config.yaml
21
config.yaml
@@ -3,24 +3,24 @@ source_db_type: sqlserver
|
|||||||
target_db_type: postgres
|
target_db_type: postgres
|
||||||
|
|
||||||
defaults:
|
defaults:
|
||||||
batches_per_partition: 8
|
batches_per_partition: 4
|
||||||
max_extractors: 2
|
max_extractors: 2
|
||||||
extractor_batch_size: 25000
|
extractor_batch_size: 5000
|
||||||
extractor_queue_size: 8
|
extractor_queue_size: 8
|
||||||
max_transformers: 2
|
max_transformers: 2
|
||||||
transformer_batch_size: 25000
|
transformer_batch_size: 12500
|
||||||
transformer_queue_size: 8
|
transformer_queue_size: 8
|
||||||
max_loaders: 4
|
max_loaders: 4
|
||||||
loader_batch_size: 25000
|
loader_batch_size: 25000
|
||||||
truncate_target: true
|
truncate_target: true
|
||||||
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
truncate_method: TRUNCATE # TRUNCATE | DELETE
|
||||||
max_partition_errrors: 5
|
|
||||||
max_extractor_batch_errors: 5
|
|
||||||
retry:
|
retry:
|
||||||
attempts: 3
|
attempts: 3
|
||||||
base_delay_ms: 500
|
base_delay_ms: 500
|
||||||
max_delay_ms: 10000
|
max_delay_ms: 10000
|
||||||
max_jitter_ms: 500
|
max_jitter_ms: 500
|
||||||
|
max_failed_partitions: 5
|
||||||
|
max_failed_batches_load: 5
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
- name: cartografia_manzana
|
- name: cartografia_manzana
|
||||||
@@ -70,10 +70,15 @@ jobs:
|
|||||||
- source: DATA
|
- source: DATA
|
||||||
target: FILE_URL
|
target: FILE_URL
|
||||||
mode: REFERENCE_ONLY
|
mode: REFERENCE_ONLY
|
||||||
max_extractors: 8
|
batches_per_partition: 20
|
||||||
|
max_extractors: 32
|
||||||
|
extractor_batch_size: 1
|
||||||
|
extractor_queue_size: 100
|
||||||
|
max_transformers: 48
|
||||||
|
transformer_batch_size: 500
|
||||||
|
transformer_queue_size: 8
|
||||||
max_loaders: 4
|
max_loaders: 4
|
||||||
queue_size: 32
|
loader_batch_size: 500
|
||||||
batch_size: 1
|
|
||||||
retry:
|
retry:
|
||||||
attempts: 5
|
attempts: 5
|
||||||
base_delay_ms: 1000
|
base_delay_ms: 1000
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ type RetryConfig struct {
|
|||||||
BaseDelayMs int `yaml:"base_delay_ms"`
|
BaseDelayMs int `yaml:"base_delay_ms"`
|
||||||
MaxDelayMs int `yaml:"max_delay_ms"`
|
MaxDelayMs int `yaml:"max_delay_ms"`
|
||||||
MaxJitterMs int `yaml:"max_jitter_ms"`
|
MaxJitterMs int `yaml:"max_jitter_ms"`
|
||||||
|
MaxFailedPartitions int `yaml:"max_failed_partitions"`
|
||||||
|
MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ToStorageColumnConfig struct {
|
type ToStorageColumnConfig struct {
|
||||||
@@ -36,8 +38,6 @@ type JobConfig struct {
|
|||||||
LoaderBatchSize int `yaml:"loader_batch_size"`
|
LoaderBatchSize int `yaml:"loader_batch_size"`
|
||||||
TruncateTarget bool `yaml:"truncate_target"`
|
TruncateTarget bool `yaml:"truncate_target"`
|
||||||
TruncateMethod string `yaml:"truncate_method"`
|
TruncateMethod string `yaml:"truncate_method"`
|
||||||
MaxPartitionErrrors int `yaml:"max_partition_errrors"`
|
|
||||||
MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"`
|
|
||||||
Retry RetryConfig `yaml:"retry"`
|
Retry RetryConfig `yaml:"retry"`
|
||||||
RowsPerPartition int64
|
RowsPerPartition int64
|
||||||
ToStorage ToStorageConfig `yaml:"to_storage"`
|
ToStorage ToStorageConfig `yaml:"to_storage"`
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package custom_errors
|
package custom_errors
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -40,22 +39,3 @@ func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJ
|
|||||||
|
|
||||||
return delay
|
return delay
|
||||||
}
|
}
|
||||||
|
|
||||||
func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) {
|
|
||||||
if delay <= 0 {
|
|
||||||
enqueue()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
timer := time.NewTimer(delay)
|
|
||||||
defer timer.Stop()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-timer.C:
|
|
||||||
enqueue()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,107 +0,0 @@
|
|||||||
package custom_errors
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
type LoaderError struct {
|
|
||||||
Batch models.Batch
|
|
||||||
Msg string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *LoaderError) Error() string {
|
|
||||||
return e.Msg
|
|
||||||
}
|
|
||||||
|
|
||||||
func LoaderErrorHandler(
|
|
||||||
ctx context.Context,
|
|
||||||
retryConfig config.RetryConfig,
|
|
||||||
maxChunkErrors int,
|
|
||||||
chErrorsIn <-chan LoaderError,
|
|
||||||
chBatchesOut chan<- models.Batch,
|
|
||||||
chJobErrorsOut chan<- JobError,
|
|
||||||
wgActiveBatches *sync.WaitGroup,
|
|
||||||
) {
|
|
||||||
definitiveErrors := 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case err, ok := <-chErrorsIn:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err.Batch.RetryCounter >= retryConfig.Attempts {
|
|
||||||
wgActiveBatches.Done()
|
|
||||||
definitiveErrors++
|
|
||||||
jobError := JobError{
|
|
||||||
ShouldCancelJob: false,
|
|
||||||
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
|
|
||||||
Prev: &err,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chJobErrorsOut <- jobError:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
|
|
||||||
fatalError := JobError{
|
|
||||||
ShouldCancelJob: true,
|
|
||||||
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
|
|
||||||
Prev: &err,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chJobErrorsOut <- fatalError:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
jobError := JobError{
|
|
||||||
ShouldCancelJob: false,
|
|
||||||
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter),
|
|
||||||
Prev: &err,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chJobErrorsOut <- jobError:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err.Batch.RetryCounter++
|
|
||||||
delay := ComputeBackoffDelay(
|
|
||||||
err.Batch.RetryCounter,
|
|
||||||
retryConfig.BaseDelayMs,
|
|
||||||
retryConfig.MaxDelayMs,
|
|
||||||
retryConfig.MaxJitterMs,
|
|
||||||
)
|
|
||||||
|
|
||||||
requeueWithBackoff(ctx, delay, func() {
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- err.Batch:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -14,3 +14,12 @@ type ExtractorError struct {
|
|||||||
func (e *ExtractorError) Error() string {
|
func (e *ExtractorError) Error() string {
|
||||||
return e.Msg
|
return e.Msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type LoaderError struct {
|
||||||
|
Batch models.Batch
|
||||||
|
Msg string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *LoaderError) Error() string {
|
||||||
|
return e.Msg
|
||||||
|
}
|
||||||
@@ -10,7 +10,6 @@ import (
|
|||||||
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
mssql "github.com/microsoft/go-mssqldb"
|
mssql "github.com/microsoft/go-mssqldb"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -188,8 +187,6 @@ func buildExtractQueryMssql(q ExtractionQuery) (string, error) {
|
|||||||
hasRegularColumns := len(q.Columns) > 0
|
hasRegularColumns := len(q.Columns) > 0
|
||||||
hasJsonColumns := len(q.FromJsonColumns) > 0
|
hasJsonColumns := len(q.FromJsonColumns) > 0
|
||||||
|
|
||||||
// logrus.Debugf("Extraction query: %+v", q)
|
|
||||||
|
|
||||||
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
|
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
|
||||||
if hasJsonColumns {
|
if hasJsonColumns {
|
||||||
for _, jsonConfig := range q.FromJsonColumns {
|
for _, jsonConfig := range q.FromJsonColumns {
|
||||||
@@ -296,7 +293,7 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Query: %s", queryString)
|
// logrus.Debugf("Query: %s", queryString)
|
||||||
|
|
||||||
var queryArgs []any
|
var queryArgs []any
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume(
|
|||||||
chErrorsOut chan<- custom_errors.JobError,
|
chErrorsOut chan<- custom_errors.JobError,
|
||||||
wgActivePartitions *sync.WaitGroup,
|
wgActivePartitions *sync.WaitGroup,
|
||||||
rowsRead *int64,
|
rowsRead *int64,
|
||||||
|
failedPartitionsCount *int32,
|
||||||
fromJsonColumns []config.FromJsonItem,
|
fromJsonColumns []config.FromJsonItem,
|
||||||
) {
|
) {
|
||||||
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
|
||||||
@@ -72,11 +73,12 @@ func (ex *GenericExtractor) Consume(
|
|||||||
|
|
||||||
if rowsReadResult > 0 {
|
if rowsReadResult > 0 {
|
||||||
current := atomic.LoadInt64(rowsRead)
|
current := atomic.LoadInt64(rowsRead)
|
||||||
logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
|
logrus.Debugf("Rows read (partition extracted): +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
|
||||||
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
atomic.AddInt64(rowsRead, int64(rowsReadResult))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
atomic.AddInt32(failedPartitionsCount, 1)
|
||||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -90,6 +92,16 @@ func (ex *GenericExtractor) Consume(
|
|||||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
currentFPCount := atomic.LoadInt32(failedPartitionsCount)
|
||||||
|
if currentFPCount > int32(retryConfig.MaxFailedPartitions) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed partitions reached"}:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
|
|||||||
|
|
||||||
if currentParitition.RetryCounter >= retryConfig.Attempts {
|
if currentParitition.RetryCounter >= retryConfig.Attempts {
|
||||||
return totalRowsRead, &custom_errors.JobError{
|
return totalRowsRead, &custom_errors.JobError{
|
||||||
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id),
|
Msg: fmt.Sprintf("Partition %v reached max retries (%d)", currentParitition.Id, currentParitition.RetryCounter),
|
||||||
Prev: err,
|
Prev: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ import (
|
|||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (gl *GenericLoader) Consume(
|
func (gl *GenericLoader) Consume(
|
||||||
@@ -16,15 +18,77 @@ func (gl *GenericLoader) Consume(
|
|||||||
tableInfo config.TargetTableInfo,
|
tableInfo config.TargetTableInfo,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
retryConfig config.RetryConfig,
|
retryConfig config.RetryConfig,
|
||||||
|
batchSize int,
|
||||||
chBatchesIn <-chan models.Batch,
|
chBatchesIn <-chan models.Batch,
|
||||||
chErrorsOut chan<- custom_errors.JobError,
|
chErrorsOut chan<- custom_errors.JobError,
|
||||||
wgActiveBatches *sync.WaitGroup,
|
wgActiveBatches *sync.WaitGroup,
|
||||||
rowsLoaded *int64,
|
rowsLoaded *int64,
|
||||||
|
failedBatchesCount *int32,
|
||||||
) {
|
) {
|
||||||
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
colNames := mapSlice(columns, func(col models.ColumnType) string {
|
||||||
return col.Name()
|
return col.Name()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var accRows []models.UnknownRowValues
|
||||||
|
var parentBatchesId []uuid.UUID
|
||||||
|
pendingDone := 0
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
for range pendingDone {
|
||||||
|
wgActiveBatches.Done()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
flush := func() bool {
|
||||||
|
if len(accRows) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
count := len(parentBatchesId)
|
||||||
|
superBatch := models.Batch{
|
||||||
|
Id: uuid.New(),
|
||||||
|
ParentBatchesId: parentBatchesId,
|
||||||
|
Rows: accRows,
|
||||||
|
}
|
||||||
|
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, superBatch)
|
||||||
|
for range count {
|
||||||
|
wgActiveBatches.Done()
|
||||||
|
}
|
||||||
|
pendingDone -= count
|
||||||
|
accRows = nil
|
||||||
|
parentBatchesId = nil
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
atomic.AddInt32(failedBatchesCount, 1)
|
||||||
|
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false
|
||||||
|
case chErrorsOut <- *jobError:
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false
|
||||||
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
current := atomic.LoadInt64(rowsLoaded)
|
||||||
|
logrus.Debugf("Rows loaded (batch loaded): +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
|
||||||
|
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return
|
return
|
||||||
@@ -35,13 +99,16 @@ func (gl *GenericLoader) Consume(
|
|||||||
return
|
return
|
||||||
case batch, ok := <-chBatchesIn:
|
case batch, ok := <-chBatchesIn:
|
||||||
if !ok {
|
if !ok {
|
||||||
|
flush()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if batchSize <= 0 {
|
||||||
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
|
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
|
||||||
wgActiveBatches.Done()
|
wgActiveBatches.Done()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
atomic.AddInt32(failedBatchesCount, 1)
|
||||||
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -55,8 +122,32 @@ func (gl *GenericLoader) Consume(
|
|||||||
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
|
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
current := atomic.LoadInt64(rowsLoaded)
|
||||||
|
logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
|
||||||
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
atomic.AddInt64(rowsLoaded, int64(processedRows))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
pendingDone++
|
||||||
|
accRows = append(accRows, batch.Rows...)
|
||||||
|
parentBatchesId = append(parentBatchesId, batch.Id)
|
||||||
|
|
||||||
|
if len(accRows) >= batchSize {
|
||||||
|
if !flush() {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,8 +29,7 @@ func (gl *GenericLoader) ProcessBatchWithRetries(
|
|||||||
|
|
||||||
if batch.RetryCounter >= retryConfig.Attempts {
|
if batch.RetryCounter >= retryConfig.Attempts {
|
||||||
return rowsLoaded, &custom_errors.JobError{
|
return rowsLoaded, &custom_errors.JobError{
|
||||||
ShouldCancelJob: false,
|
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", batch.Id, batch.RetryCounter),
|
||||||
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", btError.Batch.Id, btError.Batch.RetryCounter),
|
|
||||||
Prev: btError,
|
Prev: btError,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
117
internal/app/etl/transformers/consume.go
Normal file
117
internal/app/etl/transformers/consume.go
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
package transformers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (mssqlTr *MssqlTransformer) Consume(
|
||||||
|
ctx context.Context,
|
||||||
|
columns []models.ColumnType,
|
||||||
|
retryConfig config.RetryConfig,
|
||||||
|
batchSize int,
|
||||||
|
chBatchesIn <-chan models.Batch,
|
||||||
|
chBatchesOut chan<- models.Batch,
|
||||||
|
chJobErrorsOut chan<- custom_errors.JobError,
|
||||||
|
wgActiveBatches *sync.WaitGroup,
|
||||||
|
) {
|
||||||
|
transformationPlan := computeTransformationPlan(columns)
|
||||||
|
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
|
||||||
|
transformationPlan = append(transformationPlan, storagePlan...)
|
||||||
|
|
||||||
|
var accRows []models.UnknownRowValues
|
||||||
|
var parentBatchesId []uuid.UUID
|
||||||
|
var firstPartitionId uuid.UUID
|
||||||
|
|
||||||
|
flush := func() bool {
|
||||||
|
if len(accRows) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
out := models.Batch{
|
||||||
|
Id: uuid.New(),
|
||||||
|
PartitionId: firstPartitionId,
|
||||||
|
ParentBatchesId: parentBatchesId,
|
||||||
|
Rows: accRows,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case chBatchesOut <- out:
|
||||||
|
wgActiveBatches.Add(1)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
accRows = nil
|
||||||
|
parentBatchesId = nil
|
||||||
|
firstPartitionId = uuid.Nil
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case batch, ok := <-chBatchesIn:
|
||||||
|
if !ok {
|
||||||
|
flush()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(transformationPlan) > 0 {
|
||||||
|
err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ctx.Err()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||||
|
select {
|
||||||
|
case chJobErrorsOut <- *jobError:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
select {
|
||||||
|
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if batchSize <= 0 {
|
||||||
|
select {
|
||||||
|
case chBatchesOut <- batch:
|
||||||
|
wgActiveBatches.Add(1)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parentBatchesId) == 0 {
|
||||||
|
firstPartitionId = batch.PartitionId
|
||||||
|
}
|
||||||
|
accRows = append(accRows, batch.Rows...)
|
||||||
|
parentBatchesId = append(parentBatchesId, batch.Id)
|
||||||
|
|
||||||
|
if len(accRows) >= batchSize {
|
||||||
|
if !flush() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,20 +1,9 @@
|
|||||||
package transformers
|
package transformers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type MssqlTransformer struct {
|
type MssqlTransformer struct {
|
||||||
@@ -30,196 +19,3 @@ func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.So
|
|||||||
azureClient: azureClient,
|
azureClient: azureClient,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
|
||||||
var plan []etl.ColumnTransformPlan
|
|
||||||
|
|
||||||
for i, col := range columns {
|
|
||||||
switch col.SystemType() {
|
|
||||||
case "uniqueidentifier":
|
|
||||||
plan = append(plan, etl.ColumnTransformPlan{
|
|
||||||
Index: i,
|
|
||||||
Fn: func(v any) (any, error) {
|
|
||||||
if b, ok := v.([]byte); ok && b != nil {
|
|
||||||
return mssqlUuidToBigEndian(b)
|
|
||||||
}
|
|
||||||
return v, nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
case "geometry", "geography":
|
|
||||||
plan = append(plan, etl.ColumnTransformPlan{
|
|
||||||
Index: i,
|
|
||||||
Fn: func(v any) (any, error) {
|
|
||||||
if b, ok := v.([]byte); ok && b != nil {
|
|
||||||
return wkbToEwkbWithSrid(b, 4326)
|
|
||||||
}
|
|
||||||
return v, nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
case "datetime", "datetime2":
|
|
||||||
plan = append(plan, etl.ColumnTransformPlan{
|
|
||||||
Index: i,
|
|
||||||
Fn: func(v any) (any, error) {
|
|
||||||
if t, ok := v.(time.Time); ok {
|
|
||||||
return ensureUTC(t), nil
|
|
||||||
}
|
|
||||||
return v, nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return plan
|
|
||||||
}
|
|
||||||
|
|
||||||
func computeStorageTransformationPlan(
|
|
||||||
ctx context.Context,
|
|
||||||
azureClient *azure.Client,
|
|
||||||
toStorage config.ToStorageConfig,
|
|
||||||
sourceColumns []models.ColumnType,
|
|
||||||
sourceTable config.SourceTableInfo,
|
|
||||||
) []etl.ColumnTransformPlan {
|
|
||||||
if azureClient == nil || len(toStorage.Columns) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
colIndex := make(map[string]int, len(sourceColumns))
|
|
||||||
for i, col := range sourceColumns {
|
|
||||||
colIndex[strings.ToUpper(col.Name())] = i
|
|
||||||
}
|
|
||||||
|
|
||||||
var plan []etl.ColumnTransformPlan
|
|
||||||
for _, storageCol := range toStorage.Columns {
|
|
||||||
if storageCol.Mode != "REFERENCE_ONLY" {
|
|
||||||
log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
|
|
||||||
if !ok {
|
|
||||||
log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
sourceColName := storageCol.Source
|
|
||||||
schema := sourceTable.Schema
|
|
||||||
table := sourceTable.Table
|
|
||||||
|
|
||||||
plan = append(plan, etl.ColumnTransformPlan{
|
|
||||||
Index: idx,
|
|
||||||
Fn: func(v any) (any, error) {
|
|
||||||
if v == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
b, ok := v.([]byte)
|
|
||||||
if !ok {
|
|
||||||
log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
|
|
||||||
schema, table, sourceColName, v)
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
start := time.Now()
|
|
||||||
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
|
|
||||||
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err)
|
|
||||||
}
|
|
||||||
log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
|
|
||||||
return blobURL, nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return plan
|
|
||||||
}
|
|
||||||
|
|
||||||
const processBatchCtxCheck = 4096
|
|
||||||
|
|
||||||
func (mssqlTr *MssqlTransformer) ProcessBatch(
|
|
||||||
ctx context.Context,
|
|
||||||
batch *models.Batch,
|
|
||||||
transformationPlan []etl.ColumnTransformPlan,
|
|
||||||
) error {
|
|
||||||
for i, rowValues := range batch.Rows {
|
|
||||||
if i%processBatchCtxCheck == 0 {
|
|
||||||
if err := ctx.Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, task := range transformationPlan {
|
|
||||||
val := rowValues[task.Index]
|
|
||||||
if val == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
transformed, err := task.Fn(val)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rowValues[task.Index] = transformed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mssqlTr *MssqlTransformer) Exec(
|
|
||||||
ctx context.Context,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
chBatchesIn <-chan models.Batch,
|
|
||||||
chBatchesOut chan<- models.Batch,
|
|
||||||
chJobErrorsOut chan<- custom_errors.JobError,
|
|
||||||
wgActiveBatches *sync.WaitGroup,
|
|
||||||
) {
|
|
||||||
transformationPlan := computeTransformationPlan(columns)
|
|
||||||
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
|
|
||||||
transformationPlan = append(transformationPlan, storagePlan...)
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case batch, ok := <-chBatchesIn:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(transformationPlan) == 0 {
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- batch:
|
|
||||||
wgActiveBatches.Add(1)
|
|
||||||
continue
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, ctx.Err()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case chBatchesOut <- batch:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
wgActiveBatches.Add(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
122
internal/app/etl/transformers/plan.go
Normal file
122
internal/app/etl/transformers/plan.go
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
package transformers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
|
||||||
|
var plan []etl.ColumnTransformPlan
|
||||||
|
|
||||||
|
for i, col := range columns {
|
||||||
|
switch col.SystemType() {
|
||||||
|
case "uniqueidentifier":
|
||||||
|
plan = append(plan, etl.ColumnTransformPlan{
|
||||||
|
Index: i,
|
||||||
|
Fn: func(v any) (any, error) {
|
||||||
|
if b, ok := v.([]byte); ok && b != nil {
|
||||||
|
return mssqlUuidToBigEndian(b)
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
case "geometry", "geography":
|
||||||
|
plan = append(plan, etl.ColumnTransformPlan{
|
||||||
|
Index: i,
|
||||||
|
Fn: func(v any) (any, error) {
|
||||||
|
if b, ok := v.([]byte); ok && b != nil {
|
||||||
|
return wkbToEwkbWithSrid(b, 4326)
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
case "datetime", "datetime2":
|
||||||
|
plan = append(plan, etl.ColumnTransformPlan{
|
||||||
|
Index: i,
|
||||||
|
Fn: func(v any) (any, error) {
|
||||||
|
if t, ok := v.(time.Time); ok {
|
||||||
|
return ensureUTC(t), nil
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return plan
|
||||||
|
}
|
||||||
|
|
||||||
|
func computeStorageTransformationPlan(
|
||||||
|
ctx context.Context,
|
||||||
|
azureClient *azure.Client,
|
||||||
|
toStorage config.ToStorageConfig,
|
||||||
|
sourceColumns []models.ColumnType,
|
||||||
|
sourceTable config.SourceTableInfo,
|
||||||
|
) []etl.ColumnTransformPlan {
|
||||||
|
if azureClient == nil || len(toStorage.Columns) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
colIndex := make(map[string]int, len(sourceColumns))
|
||||||
|
for i, col := range sourceColumns {
|
||||||
|
colIndex[strings.ToUpper(col.Name())] = i
|
||||||
|
}
|
||||||
|
|
||||||
|
var plan []etl.ColumnTransformPlan
|
||||||
|
for _, storageCol := range toStorage.Columns {
|
||||||
|
if storageCol.Mode != "REFERENCE_ONLY" {
|
||||||
|
logrus.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
|
||||||
|
if !ok {
|
||||||
|
logrus.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceColName := storageCol.Source
|
||||||
|
schema := sourceTable.Schema
|
||||||
|
table := sourceTable.Table
|
||||||
|
|
||||||
|
plan = append(plan, etl.ColumnTransformPlan{
|
||||||
|
Index: idx,
|
||||||
|
Fn: func(v any) (any, error) {
|
||||||
|
if v == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
b, ok := v.([]byte)
|
||||||
|
if !ok {
|
||||||
|
logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
|
||||||
|
schema, table, sourceColName, v)
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
// start := time.Now()
|
||||||
|
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
|
||||||
|
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
|
||||||
|
if err != nil {
|
||||||
|
return nil, &custom_errors.JobError{
|
||||||
|
Msg: fmt.Sprintf("Error uploading %s.%s.%s", schema, table, sourceColName),
|
||||||
|
Prev: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
|
||||||
|
return blobURL, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return plan
|
||||||
|
}
|
||||||
73
internal/app/etl/transformers/process-with-retries.go
Normal file
73
internal/app/etl/transformers/process-with-retries.go
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package transformers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
|
||||||
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
const processBatchCtxCheck = 4096
|
||||||
|
|
||||||
|
func ProcessBatchWithRetries(
|
||||||
|
ctx context.Context,
|
||||||
|
batch *models.Batch,
|
||||||
|
transformationPlan []etl.ColumnTransformPlan,
|
||||||
|
retryConfig config.RetryConfig,
|
||||||
|
) error {
|
||||||
|
for i, rowValues := range batch.Rows {
|
||||||
|
if i%processBatchCtxCheck == 0 {
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, task := range transformationPlan {
|
||||||
|
val := rowValues[task.Index]
|
||||||
|
if val == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
|
success := false
|
||||||
|
|
||||||
|
for attempt := 0; attempt < retryConfig.Attempts; attempt++ {
|
||||||
|
transformed, err := task.Fn(val)
|
||||||
|
if err == nil {
|
||||||
|
rowValues[task.Index] = transformed
|
||||||
|
success = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
lastErr = err
|
||||||
|
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
|
||||||
|
if jobError.ShouldCancelJob {
|
||||||
|
return jobError
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if attempt == retryConfig.Attempts-1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := custom_errors.ComputeBackoffDelay(
|
||||||
|
attempt,
|
||||||
|
retryConfig.BaseDelayMs,
|
||||||
|
retryConfig.MaxDelayMs,
|
||||||
|
retryConfig.MaxJitterMs,
|
||||||
|
)
|
||||||
|
time.Sleep(delay)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !success {
|
||||||
|
return lastErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -1 +0,0 @@
|
|||||||
package transformers
|
|
||||||
@@ -9,18 +9,6 @@ import (
|
|||||||
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Extractor interface {
|
|
||||||
ProcessPartition(
|
|
||||||
ctx context.Context,
|
|
||||||
tableInfo config.SourceTableInfo,
|
|
||||||
columns []models.ColumnType,
|
|
||||||
batchSize int,
|
|
||||||
partition models.Partition,
|
|
||||||
indexPrimaryKey int,
|
|
||||||
chBatchesOut chan<- models.Batch,
|
|
||||||
) (int, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type TransformerFunc func(any) (any, error)
|
type TransformerFunc func(any) (any, error)
|
||||||
|
|
||||||
type ColumnTransformPlan struct {
|
type ColumnTransformPlan struct {
|
||||||
@@ -29,31 +17,18 @@ type ColumnTransformPlan struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Transformer interface {
|
type Transformer interface {
|
||||||
ProcessBatch(
|
Consume(
|
||||||
ctx context.Context,
|
|
||||||
batch *models.Batch,
|
|
||||||
transformationPlan []ColumnTransformPlan,
|
|
||||||
) error
|
|
||||||
|
|
||||||
Exec(
|
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
columns []models.ColumnType,
|
columns []models.ColumnType,
|
||||||
|
retryConfig config.RetryConfig,
|
||||||
|
batchSize int,
|
||||||
chBatchesIn <-chan models.Batch,
|
chBatchesIn <-chan models.Batch,
|
||||||
chBactchesOut chan<- models.Batch,
|
chBatchesOut chan<- models.Batch,
|
||||||
chJobErrorsOut chan<- custom_errors.JobError,
|
chJobErrorsOut chan<- custom_errors.JobError,
|
||||||
wgActiveBatches *sync.WaitGroup,
|
wgActiveBatches *sync.WaitGroup,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Loader interface {
|
|
||||||
ProcessBatch(
|
|
||||||
ctx context.Context,
|
|
||||||
tableInfo config.TargetTableInfo,
|
|
||||||
colNames []string,
|
|
||||||
batch models.Batch,
|
|
||||||
) (int64, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type TableAnalyzer interface {
|
type TableAnalyzer interface {
|
||||||
QueryColumnTypes(
|
QueryColumnTypes(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ type UnknownRowValues = []any
|
|||||||
type Batch struct {
|
type Batch struct {
|
||||||
Id uuid.UUID
|
Id uuid.UUID
|
||||||
PartitionId uuid.UUID
|
PartitionId uuid.UUID
|
||||||
|
ParentBatchesId []uuid.UUID
|
||||||
Rows []UnknownRowValues
|
Rows []UnknownRowValues
|
||||||
RetryCounter int
|
RetryCounter int
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,8 +12,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
totalRows int = 1_000_000
|
// totalRows int = 1_000_000
|
||||||
chunkSize int = 50_000
|
totalRows int = 1000
|
||||||
|
chunkSize int = 200
|
||||||
queueSize int = 4
|
queueSize int = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -40,6 +41,14 @@ func main() {
|
|||||||
seedManzanas(ctx, db)
|
seedManzanas(ctx, db)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
wgSeed.Go(func() {
|
||||||
|
seedPuertos(ctx, db)
|
||||||
|
})
|
||||||
|
|
||||||
|
wgSeed.Go(func() {
|
||||||
|
seedSiteHolderAttach(ctx, db)
|
||||||
|
})
|
||||||
|
|
||||||
wgSeed.Wait()
|
wgSeed.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
227
scripts/mssql-copy-in/site-holder-attach.go
Normal file
227
scripts/mssql-copy-in/site-holder-attach.go
Normal file
@@ -0,0 +1,227 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var siteHolderAttachJob = MigrationJob{
|
||||||
|
Schema: "Infraestructura",
|
||||||
|
Table: "SITE_HOLDER__ATTACH",
|
||||||
|
}
|
||||||
|
|
||||||
|
func seedSiteHolderAttach(ctx context.Context, db *sql.DB) error {
|
||||||
|
maxOid, err := getMaxGDBArchiveOidForAttach(ctx, db)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Error getting max GDB_ARCHIVE_OID: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Starting SITE_HOLDER__ATTACH data generation from GDB_ARCHIVE_OID: %d", maxOid+1)
|
||||||
|
|
||||||
|
rowsChan := make(chan []UnknownRowValues, queueSize)
|
||||||
|
|
||||||
|
var wgRowGenerator sync.WaitGroup
|
||||||
|
|
||||||
|
wgRowGenerator.Go(func() {
|
||||||
|
generateSiteHolderAttachRows(ctx, maxOid, totalRows, chunkSize, rowsChan)
|
||||||
|
})
|
||||||
|
|
||||||
|
columns := []string{
|
||||||
|
"GDB_ARCHIVE_OID",
|
||||||
|
"REL_GLOBALID",
|
||||||
|
"CONTENT_TYPE",
|
||||||
|
"ATT_NAME",
|
||||||
|
"DATA_SIZE",
|
||||||
|
"DATA",
|
||||||
|
"GLOBALID",
|
||||||
|
"GDB_FROM_DATE",
|
||||||
|
"GDB_TO_DATE",
|
||||||
|
"ATTACHMENTID",
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := loadRowsMssql(ctx, siteHolderAttachJob, columns, db, rowsChan); err != nil {
|
||||||
|
return fmt.Errorf("Error loading rows (SITE_HOLDER__ATTACH): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Data generation and loading completed successfully (SITE_HOLDER__ATTACH)")
|
||||||
|
wgRowGenerator.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMaxGDBArchiveOidForAttach(ctx context.Context, db *sql.DB) (int, error) {
|
||||||
|
var maxOid sql.NullInt64
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0)
|
||||||
|
FROM [%s].[%s]
|
||||||
|
`, siteHolderAttachJob.Schema, siteHolderAttachJob.Table)
|
||||||
|
|
||||||
|
err := db.QueryRowContext(ctx, query).Scan(&maxOid)
|
||||||
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !maxOid.Valid {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return int(maxOid.Int64), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateSiteHolderAttachRows(
|
||||||
|
ctx context.Context,
|
||||||
|
startOid int,
|
||||||
|
totalRows int,
|
||||||
|
chunkSize int,
|
||||||
|
out chan<- []UnknownRowValues,
|
||||||
|
) {
|
||||||
|
defer close(out)
|
||||||
|
|
||||||
|
rowsGenerated := 0
|
||||||
|
currentChunk := make([]UnknownRowValues, 0, chunkSize)
|
||||||
|
|
||||||
|
for i := range totalRows {
|
||||||
|
gdbArchiveOid := startOid + i + 1
|
||||||
|
row := generateSiteHolderAttachRow(gdbArchiveOid)
|
||||||
|
currentChunk = append(currentChunk, row)
|
||||||
|
rowsGenerated++
|
||||||
|
|
||||||
|
if len(currentChunk) == chunkSize {
|
||||||
|
select {
|
||||||
|
case out <- currentChunk:
|
||||||
|
log.Debugf("Sent SITE_HOLDER__ATTACH chunk with %d rows", len(currentChunk))
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Info("Context cancelled, stopping SITE_HOLDER__ATTACH row generation")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
currentChunk = make([]UnknownRowValues, 0, chunkSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rowsGenerated%100_000 == 0 {
|
||||||
|
logSiteHolderAttachSampleRow(rowsGenerated, row)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(currentChunk) > 0 {
|
||||||
|
select {
|
||||||
|
case out <- currentChunk:
|
||||||
|
log.Debugf("Sent final SITE_HOLDER__ATTACH chunk with %d rows", len(currentChunk))
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Info("Context cancelled, stopping SITE_HOLDER__ATTACH row generation")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Finished generating %d SITE_HOLDER__ATTACH rows", rowsGenerated)
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateSiteHolderAttachRow(gdbArchiveOid int) UnknownRowValues {
|
||||||
|
dateLowerLimit, _ := time.Parse(time.RFC3339, "2020-12-31T23:59:59Z")
|
||||||
|
dateUpperLimit, _ := time.Parse(time.RFC3339, "2025-12-31T23:59:59Z")
|
||||||
|
|
||||||
|
relGlobalID, _ := uuid.New().MarshalBinary()
|
||||||
|
contentType := generateRandomContentType()
|
||||||
|
attName := generateRandomAttachmentName()
|
||||||
|
binaryData := generateRandomBinaryContent()
|
||||||
|
dataSize := len(binaryData)
|
||||||
|
globalID, _ := uuid.New().MarshalBinary()
|
||||||
|
gdbFromDate := generateRandomTimestamp(dateLowerLimit, dateUpperLimit)
|
||||||
|
gdbToDate, _ := time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
|
||||||
|
attachmentID := rand.Intn(10000) + 1
|
||||||
|
|
||||||
|
return UnknownRowValues{
|
||||||
|
gdbArchiveOid,
|
||||||
|
relGlobalID,
|
||||||
|
contentType,
|
||||||
|
attName,
|
||||||
|
dataSize,
|
||||||
|
binaryData,
|
||||||
|
globalID,
|
||||||
|
gdbFromDate,
|
||||||
|
gdbToDate,
|
||||||
|
attachmentID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateRandomContentType() string {
|
||||||
|
contentTypes := []string{
|
||||||
|
"text/plain",
|
||||||
|
"application/pdf",
|
||||||
|
"image/jpeg",
|
||||||
|
"image/png",
|
||||||
|
"application/msword",
|
||||||
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
||||||
|
"text/csv",
|
||||||
|
"application/json",
|
||||||
|
}
|
||||||
|
return contentTypes[rand.Intn(len(contentTypes))]
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateRandomAttachmentName() string {
|
||||||
|
extensions := []string{".txt", ".pdf", ".jpg", ".png", ".doc", ".docx", ".csv", ".json"}
|
||||||
|
baseName := generateRandomString(20)
|
||||||
|
extension := extensions[rand.Intn(len(extensions))]
|
||||||
|
return baseName + extension
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateRandomBinaryContent() []byte {
|
||||||
|
sizeOptions := []int{100, 500, 1000, 5000, 10000, 50000, 100000}
|
||||||
|
size := sizeOptions[rand.Intn(len(sizeOptions))]
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
|
||||||
|
lineCount := rand.Intn(size/50) + 1
|
||||||
|
for range lineCount {
|
||||||
|
line := generateRandomString(rand.Intn(80) + 20)
|
||||||
|
buf.WriteString(line)
|
||||||
|
buf.WriteString("\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
for buf.Len() < size {
|
||||||
|
randomText := generateRandomString(rand.Intn(100) + 50)
|
||||||
|
buf.WriteString(randomText)
|
||||||
|
buf.WriteString("\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
result := buf.Bytes()
|
||||||
|
if len(result) > size {
|
||||||
|
result = result[:size]
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func logSiteHolderAttachSampleRow(id int, rowValues UnknownRowValues) {
|
||||||
|
dataBytes := rowValues[5].([]byte)
|
||||||
|
log.Infof(`
|
||||||
|
Sample SITE_HOLDER__ATTACH row #%d:
|
||||||
|
GDB_ARCHIVE_OID: %v
|
||||||
|
REL_GLOBALID: [binary UUID]
|
||||||
|
CONTENT_TYPE: %v
|
||||||
|
ATT_NAME: %v
|
||||||
|
DATA_SIZE: %v
|
||||||
|
DATA: [%d bytes of binary content]
|
||||||
|
GLOBALID: [binary UUID]
|
||||||
|
GDB_FROM_DATE: %v
|
||||||
|
GDB_TO_DATE: %v
|
||||||
|
ATTACHMENTID: %v
|
||||||
|
`,
|
||||||
|
id,
|
||||||
|
rowValues[0],
|
||||||
|
rowValues[2],
|
||||||
|
rowValues[3],
|
||||||
|
rowValues[4],
|
||||||
|
len(dataBytes),
|
||||||
|
rowValues[7],
|
||||||
|
rowValues[8],
|
||||||
|
rowValues[9],
|
||||||
|
)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user