12 Commits

Author SHA1 Message Date
0c59d06af6 refactor: adjust configuration parameters for extractors and transformers; optimize batch processing settings 2026-05-09 02:07:09 -05:00
6fa9b21b1c refactor: update totalRows and chunkSize constants for improved performance 2026-05-09 01:49:39 -05:00
a8be31c18b refactor: adjust configuration parameters for extractors and loaders; enhance logging messages for clarity 2026-05-09 01:48:36 -05:00
5a8bce7701 refactor: update totalRows constant and add siteHolderAttach data generation logic; enhance row generation and loading process 2026-05-09 01:33:12 -05:00
b690e580c5 refactor: enhance logging and batch processing in migration; adjust configuration parameters for improved performance 2026-05-09 01:16:34 -05:00
68d983ea57 refactor: remove unused Extractor and Loader interfaces from types.go; streamline code structure 2026-05-09 00:32:32 -05:00
1bc7b67643 refactor: replace Exec with Consume method in MssqlTransformer; enhance retry handling and streamline transformation logic 2026-05-09 00:30:49 -05:00
d124da8b20 refactor: enhance error messages for max retries in partition and batch processing 2026-05-08 23:36:59 -05:00
b5fd6d0534 refactor: remove unused LoaderError type and associated file; streamline error handling structure 2026-05-08 23:32:07 -05:00
85d7d69da9 refactor: streamline error handling in migration process; consolidate failed partitions and batches tracking 2026-05-08 23:22:53 -05:00
d54108d5e5 refactor: move max failed batches configuration to retry section; clean up unused error handling code 2026-05-08 23:00:23 -05:00
212d3663e2 refactor: remove unused error channels and enhance job configuration; add max failed batches load parameter 2026-05-08 22:28:31 -05:00
21 changed files with 754 additions and 457 deletions

View File

@@ -86,7 +86,7 @@ func main() {
log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed) log.Infof("Migración terminada. Tablas: %d, Errores: %d, Filas totales: %d", len(results), totalErrors, totalProcessed)
totalDuration := time.Since(startTime) totalDuration := time.Since(startTime)
log.Infof("=== Migration completed successfully! ===") // log.Infof("=== Migration completed successfully! ===")
log.Infof("Total migration time: %v", totalDuration) log.Infof("Total migration time: %v", totalDuration)
} }

View File

@@ -57,8 +57,6 @@ func processMigrationJob(
StartTime: time.Now(), StartTime: time.Now(),
} }
var rowsRead, rowsLoaded, rowsFailed int64
var wgQueryColumnTypes errgroup.Group var wgQueryColumnTypes errgroup.Group
var sourceColTypes, targetColTypes []models.ColumnType var sourceColTypes, targetColTypes []models.ColumnType
@@ -114,16 +112,13 @@ func processMigrationJob(
} }
chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize) chJobErrors := make(chan custom_errors.JobError, jobErrorsChannelSize)
chLoadersErrors := make(chan custom_errors.LoaderError, job.ExtractorQueueSize) chPartitions := make(chan models.Partition)
chPartitions := make(chan models.Partition, job.ExtractorQueueSize)
chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize) chBatchesRaw := make(chan models.Batch, job.ExtractorQueueSize)
chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize) chBatchesTransformed := make(chan models.Batch, job.TransformerQueueSize)
var wgActivePartitions sync.WaitGroup var wgActivePartitions, wgActiveBatches, wgExtractors, wgTransformers, wgLoaders sync.WaitGroup
var wgActiveBatches sync.WaitGroup var rowsRead, rowsLoaded, rowsFailed int64
var wgExtractors sync.WaitGroup var failedPartitionsCount, failedBatchesLoadCount int32
var wgTransformers sync.WaitGroup
var wgLoaders sync.WaitGroup
go func() { go func() {
if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil { if err := custom_errors.JobErrorHandler(localCtx, chJobErrors); err != nil {
@@ -133,18 +128,8 @@ func processMigrationJob(
} }
}() }()
go custom_errors.LoaderErrorHandler(
localCtx,
job.Retry,
job.MaxExtractorBatchErrors,
chLoadersErrors,
chBatchesTransformed,
chJobErrors,
&wgActiveBatches,
)
maxExtractors := min(job.MaxExtractors, len(partitions)) maxExtractors := min(job.MaxExtractors, len(partitions))
log.Infof("Starting %d extractor(s)...", maxExtractors) log.Infof("Starting %d extractor(s)... (%v)", maxExtractors, job.Name)
for range maxExtractors { for range maxExtractors {
wgExtractors.Go(func() { wgExtractors.Go(func() {
@@ -159,6 +144,7 @@ func processMigrationJob(
chJobErrors, chJobErrors,
&wgActivePartitions, &wgActivePartitions,
&rowsRead, &rowsRead,
&failedPartitionsCount,
job.SourceTable.FromJsonColumns, job.SourceTable.FromJsonColumns,
) )
}) })
@@ -171,13 +157,15 @@ func processMigrationJob(
} }
}() }()
log.Infof("Starting %d transformer(s)...", maxExtractors) log.Infof("Starting %d transformer(s)... (%v)", maxExtractors, job.Name)
for range maxExtractors { for range maxExtractors {
wgTransformers.Go(func() { wgTransformers.Go(func() {
transformer.Exec( transformer.Consume(
localCtx, localCtx,
sourceColTypes, sourceColTypes,
job.Retry,
job.TransformerBatchSize,
chBatchesRaw, chBatchesRaw,
chBatchesTransformed, chBatchesTransformed,
chJobErrors, chJobErrors,
@@ -186,7 +174,7 @@ func processMigrationJob(
}) })
} }
log.Infof("Starting %d loader(s)...", job.MaxLoaders) log.Infof("Starting %d loader(s)... (%v)", job.MaxLoaders, job.Name)
for range job.MaxLoaders { for range job.MaxLoaders {
wgLoaders.Go(func() { wgLoaders.Go(func() {
@@ -195,39 +183,39 @@ func processMigrationJob(
job.TargetTable, job.TargetTable,
targetColTypes, targetColTypes,
job.Retry, job.Retry,
job.LoaderBatchSize,
chBatchesTransformed, chBatchesTransformed,
chJobErrors, chJobErrors,
&wgActiveBatches, &wgActiveBatches,
&rowsLoaded, &rowsLoaded,
&failedBatchesLoadCount,
) )
}) })
} }
go func() { go func() {
log.Debugf("Waiting for goroutines (%v)", job.Name) // log.Debugf("Waiting for goroutines (%v)", job.Name)
wgActivePartitions.Wait() wgActivePartitions.Wait()
log.Debugf("wgActivePartitions is empty (%v)", job.Name) // log.Debugf("wgActivePartitions is empty (%v)", job.Name)
close(chPartitions) close(chPartitions)
log.Debugf("chPartitions is closed (%v)", job.Name) // log.Debugf("chPartitions is closed (%v)", job.Name)
wgExtractors.Wait() wgExtractors.Wait()
log.Debugf("wgExtractors is empty (%v)", job.Name) // log.Debugf("wgExtractors is empty (%v)", job.Name)
close(chBatchesRaw) close(chBatchesRaw)
log.Debugf("chBatchesRaw is closed (%v)", job.Name) // log.Debugf("chBatchesRaw is closed (%v)", job.Name)
wgTransformers.Wait() wgTransformers.Wait()
log.Debugf("wgTransformers is empty (%v)", job.Name) // log.Debugf("wgTransformers is empty (%v)", job.Name)
close(chBatchesTransformed)
// log.Debugf("chBatchesTransformed is closed (%v)", job.Name)
wgActiveBatches.Wait() wgActiveBatches.Wait()
log.Debugf("wgActiveBatches is empty (%v)", job.Name) // log.Debugf("wgActiveBatches is empty (%v)", job.Name)
close(chBatchesTransformed)
log.Debugf("chBatchesTransformed is empty (%v)", job.Name)
close(chLoadersErrors)
log.Debugf("chLoadersErrors is empty (%v)", job.Name)
wgLoaders.Wait() wgLoaders.Wait()
log.Debugf("wgLoaders is empty (%v)", job.Name) // log.Debugf("wgLoaders is empty (%v)", job.Name)
cancel() cancel()
}() }()
@@ -239,9 +227,9 @@ func processMigrationJob(
} }
} }
log.Debugf("waiting for local context to be done (%v)", job.Name) // log.Debugf("waiting for local context to be done (%v)", job.Name)
<-localCtx.Done() <-localCtx.Done()
log.Debugf("local context done (%v)", job.Name) // log.Debugf("local context done (%v)", job.Name)
if ctx.Err() != nil { if ctx.Err() != nil {
result.Error = ctx.Err() result.Error = ctx.Err()
@@ -256,5 +244,9 @@ func processMigrationJob(
result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed) result.Error = fmt.Errorf("Row count mismatch: extracted %d rows but loaded %d rows (failed: %d)", result.RowsRead, result.RowsLoaded, result.RowsFailed)
} }
if result.RowsRead == 0 {
log.Warnf("No rows extracted from (%v)", job.Name)
}
return result return result
} }

View File

@@ -3,24 +3,24 @@ source_db_type: sqlserver
target_db_type: postgres target_db_type: postgres
defaults: defaults:
batches_per_partition: 8 batches_per_partition: 4
max_extractors: 2 max_extractors: 2
extractor_batch_size: 25000 extractor_batch_size: 5000
extractor_queue_size: 8 extractor_queue_size: 8
max_transformers: 2 max_transformers: 2
transformer_batch_size: 25000 transformer_batch_size: 12500
transformer_queue_size: 8 transformer_queue_size: 8
max_loaders: 4 max_loaders: 4
loader_batch_size: 25000 loader_batch_size: 25000
truncate_target: true truncate_target: true
truncate_method: TRUNCATE # TRUNCATE | DELETE truncate_method: TRUNCATE # TRUNCATE | DELETE
max_partition_errrors: 5
max_extractor_batch_errors: 5
retry: retry:
attempts: 3 attempts: 3
base_delay_ms: 500 base_delay_ms: 500
max_delay_ms: 10000 max_delay_ms: 10000
max_jitter_ms: 500 max_jitter_ms: 500
max_failed_partitions: 5
max_failed_batches_load: 5
jobs: jobs:
- name: cartografia_manzana - name: cartografia_manzana
@@ -70,10 +70,15 @@ jobs:
- source: DATA - source: DATA
target: FILE_URL target: FILE_URL
mode: REFERENCE_ONLY mode: REFERENCE_ONLY
max_extractors: 8 batches_per_partition: 20
max_extractors: 32
extractor_batch_size: 1
extractor_queue_size: 100
max_transformers: 48
transformer_batch_size: 500
transformer_queue_size: 8
max_loaders: 4 max_loaders: 4
queue_size: 32 loader_batch_size: 500
batch_size: 1
retry: retry:
attempts: 5 attempts: 5
base_delay_ms: 1000 base_delay_ms: 1000

View File

@@ -8,10 +8,12 @@ import (
) )
type RetryConfig struct { type RetryConfig struct {
Attempts int `yaml:"attempts"` Attempts int `yaml:"attempts"`
BaseDelayMs int `yaml:"base_delay_ms"` BaseDelayMs int `yaml:"base_delay_ms"`
MaxDelayMs int `yaml:"max_delay_ms"` MaxDelayMs int `yaml:"max_delay_ms"`
MaxJitterMs int `yaml:"max_jitter_ms"` MaxJitterMs int `yaml:"max_jitter_ms"`
MaxFailedPartitions int `yaml:"max_failed_partitions"`
MaxFailedBatchesLoad int `yaml:"max_failed_batches_load"`
} }
type ToStorageColumnConfig struct { type ToStorageColumnConfig struct {
@@ -25,22 +27,20 @@ type ToStorageConfig struct {
} }
type JobConfig struct { type JobConfig struct {
BatchesPerPartition int `yaml:"batches_per_partition"` BatchesPerPartition int `yaml:"batches_per_partition"`
MaxExtractors int `yaml:"max_extractors"` MaxExtractors int `yaml:"max_extractors"`
ExtractorBatchSize int `yaml:"extractor_batch_size"` ExtractorBatchSize int `yaml:"extractor_batch_size"`
ExtractorQueueSize int `yaml:"extractor_queue_size"` ExtractorQueueSize int `yaml:"extractor_queue_size"`
MaxTransformers int `yaml:"max_transformers"` MaxTransformers int `yaml:"max_transformers"`
TransformerBatchSize int `yaml:"transformer_batch_size"` TransformerBatchSize int `yaml:"transformer_batch_size"`
TransformerQueueSize int `yaml:"transformer_queue_size"` TransformerQueueSize int `yaml:"transformer_queue_size"`
MaxLoaders int `yaml:"max_loaders"` MaxLoaders int `yaml:"max_loaders"`
LoaderBatchSize int `yaml:"loader_batch_size"` LoaderBatchSize int `yaml:"loader_batch_size"`
TruncateTarget bool `yaml:"truncate_target"` TruncateTarget bool `yaml:"truncate_target"`
TruncateMethod string `yaml:"truncate_method"` TruncateMethod string `yaml:"truncate_method"`
MaxPartitionErrrors int `yaml:"max_partition_errrors"` Retry RetryConfig `yaml:"retry"`
MaxExtractorBatchErrors int `yaml:"max_extractor_batch_errors"` RowsPerPartition int64
Retry RetryConfig `yaml:"retry"` ToStorage ToStorageConfig `yaml:"to_storage"`
RowsPerPartition int64
ToStorage ToStorageConfig `yaml:"to_storage"`
} }
type FromJsonItem struct { type FromJsonItem struct {

View File

@@ -1,7 +1,6 @@
package custom_errors package custom_errors
import ( import (
"context"
"math/rand" "math/rand"
"time" "time"
) )
@@ -40,22 +39,3 @@ func ComputeBackoffDelay(retryCounter int, baseDelayMs int, maxDelayMs int, maxJ
return delay return delay
} }
func requeueWithBackoff(ctx context.Context, delay time.Duration, enqueue func()) {
if delay <= 0 {
enqueue()
return
}
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return
case <-timer.C:
enqueue()
}
}()
}

View File

@@ -1,107 +0,0 @@
package custom_errors
import (
"context"
"fmt"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
type LoaderError struct {
Batch models.Batch
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}
func LoaderErrorHandler(
ctx context.Context,
retryConfig config.RetryConfig,
maxChunkErrors int,
chErrorsIn <-chan LoaderError,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- JobError,
wgActiveBatches *sync.WaitGroup,
) {
definitiveErrors := 0
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case err, ok := <-chErrorsIn:
if !ok {
return
}
if err.Batch.RetryCounter >= retryConfig.Attempts {
wgActiveBatches.Done()
definitiveErrors++
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Batch %v reached max retries (%d)", err.Batch.Id, retryConfig.Attempts),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
if maxChunkErrors > 0 && definitiveErrors >= maxChunkErrors {
fatalError := JobError{
ShouldCancelJob: true,
Msg: fmt.Sprintf("Chunk error limit reached (%d)", maxChunkErrors),
Prev: &err,
}
select {
case chJobErrorsOut <- fatalError:
case <-ctx.Done():
return
}
}
continue
} else {
jobError := JobError{
ShouldCancelJob: false,
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", err.Batch.Id, err.Batch.RetryCounter),
Prev: &err,
}
select {
case chJobErrorsOut <- jobError:
case <-ctx.Done():
return
}
}
err.Batch.RetryCounter++
delay := ComputeBackoffDelay(
err.Batch.RetryCounter,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
requeueWithBackoff(ctx, delay, func() {
select {
case chBatchesOut <- err.Batch:
case <-ctx.Done():
return
}
})
}
}
}

View File

@@ -14,3 +14,12 @@ type ExtractorError struct {
func (e *ExtractorError) Error() string { func (e *ExtractorError) Error() string {
return e.Msg return e.Msg
} }
type LoaderError struct {
Batch models.Batch
Msg string
}
func (e *LoaderError) Error() string {
return e.Msg
}

View File

@@ -10,7 +10,6 @@ import (
dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects" dbdialects "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/db-wrapper/db_dialects"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
mssql "github.com/microsoft/go-mssqldb" mssql "github.com/microsoft/go-mssqldb"
"github.com/sirupsen/logrus"
) )
func init() { func init() {
@@ -188,8 +187,6 @@ func buildExtractQueryMssql(q ExtractionQuery) (string, error) {
hasRegularColumns := len(q.Columns) > 0 hasRegularColumns := len(q.Columns) > 0
hasJsonColumns := len(q.FromJsonColumns) > 0 hasJsonColumns := len(q.FromJsonColumns) > 0
// logrus.Debugf("Extraction query: %+v", q)
resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns)) resolvedJson := make(map[string][]config.FromJsonItem, len(q.FromJsonColumns))
if hasJsonColumns { if hasJsonColumns {
for _, jsonConfig := range q.FromJsonColumns { for _, jsonConfig := range q.FromJsonColumns {
@@ -296,7 +293,7 @@ func (mw *mssqlDbWrapper) QueryFromObject(ctx context.Context, q ExtractionQuery
return nil, err return nil, err
} }
logrus.Debugf("Query: %s", queryString) // logrus.Debugf("Query: %s", queryString)
var queryArgs []any var queryArgs []any

View File

@@ -25,6 +25,7 @@ func (ex *GenericExtractor) Consume(
chErrorsOut chan<- custom_errors.JobError, chErrorsOut chan<- custom_errors.JobError,
wgActivePartitions *sync.WaitGroup, wgActivePartitions *sync.WaitGroup,
rowsRead *int64, rowsRead *int64,
failedPartitionsCount *int32,
fromJsonColumns []config.FromJsonItem, fromJsonColumns []config.FromJsonItem,
) { ) {
indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool { indexPrimaryKey := slices.IndexFunc(columns, func(col models.ColumnType) bool {
@@ -72,11 +73,12 @@ func (ex *GenericExtractor) Consume(
if rowsReadResult > 0 { if rowsReadResult > 0 {
current := atomic.LoadInt64(rowsRead) current := atomic.LoadInt64(rowsRead)
logrus.Debugf("Rows read: +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table) logrus.Debugf("Rows read (partition extracted): +%v [current=%v] (%s.%s)", rowsReadResult, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsRead, int64(rowsReadResult)) atomic.AddInt64(rowsRead, int64(rowsReadResult))
} }
if err != nil { if err != nil {
atomic.AddInt32(failedPartitionsCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -90,6 +92,16 @@ func (ex *GenericExtractor) Consume(
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
} }
} }
currentFPCount := atomic.LoadInt32(failedPartitionsCount)
if currentFPCount > int32(retryConfig.MaxFailedPartitions) {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed partitions reached"}:
return
}
}
} }
} }
} }

View File

@@ -50,7 +50,7 @@ func (ex *GenericExtractor) ProcessPartitionWithRetries(
if currentParitition.RetryCounter >= retryConfig.Attempts { if currentParitition.RetryCounter >= retryConfig.Attempts {
return totalRowsRead, &custom_errors.JobError{ return totalRowsRead, &custom_errors.JobError{
Msg: fmt.Sprintf("Partition %v reached max retries", exError.Partition.Id), Msg: fmt.Sprintf("Partition %v reached max retries (%d)", currentParitition.Id, currentParitition.RetryCounter),
Prev: err, Prev: err,
} }
} }

View File

@@ -9,6 +9,8 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
) )
func (gl *GenericLoader) Consume( func (gl *GenericLoader) Consume(
@@ -16,15 +18,77 @@ func (gl *GenericLoader) Consume(
tableInfo config.TargetTableInfo, tableInfo config.TargetTableInfo,
columns []models.ColumnType, columns []models.ColumnType,
retryConfig config.RetryConfig, retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chErrorsOut chan<- custom_errors.JobError, chErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
rowsLoaded *int64, rowsLoaded *int64,
failedBatchesCount *int32,
) { ) {
colNames := mapSlice(columns, func(col models.ColumnType) string { colNames := mapSlice(columns, func(col models.ColumnType) string {
return col.Name() return col.Name()
}) })
var accRows []models.UnknownRowValues
var parentBatchesId []uuid.UUID
pendingDone := 0
defer func() {
for range pendingDone {
wgActiveBatches.Done()
}
}()
flush := func() bool {
if len(accRows) == 0 {
return true
}
count := len(parentBatchesId)
superBatch := models.Batch{
Id: uuid.New(),
ParentBatchesId: parentBatchesId,
Rows: accRows,
}
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, superBatch)
for range count {
wgActiveBatches.Done()
}
pendingDone -= count
accRows = nil
parentBatchesId = nil
if err != nil {
atomic.AddInt32(failedBatchesCount, 1)
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case <-ctx.Done():
return false
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return false
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
}
if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
select {
case <-ctx.Done():
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
}
return false
}
return true
}
current := atomic.LoadInt64(rowsLoaded)
logrus.Debugf("Rows loaded (batch loaded): +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsLoaded, int64(processedRows))
return true
}
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
return return
@@ -35,28 +99,55 @@ func (gl *GenericLoader) Consume(
return return
case batch, ok := <-chBatchesIn: case batch, ok := <-chBatchesIn:
if !ok { if !ok {
flush()
return return
} }
processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch) if batchSize <= 0 {
wgActiveBatches.Done() processedRows, err := gl.ProcessBatchWithRetries(ctx, tableInfo, colNames, retryConfig, batch)
wgActiveBatches.Done()
if err != nil { if err != nil {
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok { atomic.AddInt32(failedBatchesCount, 1)
select { if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
case <-ctx.Done(): select {
return case <-ctx.Done():
case chErrorsOut <- *jobError: return
case chErrorsOut <- *jobError:
}
} else {
select {
case <-ctx.Done():
return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}:
}
} }
} else {
select { if atomic.LoadInt32(failedBatchesCount) > int32(retryConfig.MaxFailedBatchesLoad) {
case <-ctx.Done(): select {
return case <-ctx.Done():
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: false, Msg: err.Error(), Prev: err}: return
case chErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Max failed batches (load) reached"}:
return
}
} }
continue
} }
} else {
current := atomic.LoadInt64(rowsLoaded)
logrus.Debugf("Rows loaded: +%v [current=%v] (%s.%s)", processedRows, current, tableInfo.Schema, tableInfo.Table)
atomic.AddInt64(rowsLoaded, int64(processedRows)) atomic.AddInt64(rowsLoaded, int64(processedRows))
continue
}
pendingDone++
accRows = append(accRows, batch.Rows...)
parentBatchesId = append(parentBatchesId, batch.Id)
if len(accRows) >= batchSize {
if !flush() {
return
}
} }
} }
} }

View File

@@ -29,9 +29,8 @@ func (gl *GenericLoader) ProcessBatchWithRetries(
if batch.RetryCounter >= retryConfig.Attempts { if batch.RetryCounter >= retryConfig.Attempts {
return rowsLoaded, &custom_errors.JobError{ return rowsLoaded, &custom_errors.JobError{
ShouldCancelJob: false, Msg: fmt.Sprintf("Batch %v reached max retries (%d)", batch.Id, batch.RetryCounter),
Msg: fmt.Sprintf("Temporal error in batch %v (retries: %d)", btError.Batch.Id, btError.Batch.RetryCounter), Prev: btError,
Prev: btError,
} }
} }

View File

@@ -0,0 +1,117 @@
package transformers
import (
"context"
"errors"
"sync"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
)
func (mssqlTr *MssqlTransformer) Consume(
ctx context.Context,
columns []models.ColumnType,
retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...)
var accRows []models.UnknownRowValues
var parentBatchesId []uuid.UUID
var firstPartitionId uuid.UUID
flush := func() bool {
if len(accRows) == 0 {
return true
}
out := models.Batch{
Id: uuid.New(),
PartitionId: firstPartitionId,
ParentBatchesId: parentBatchesId,
Rows: accRows,
}
select {
case chBatchesOut <- out:
wgActiveBatches.Add(1)
case <-ctx.Done():
return false
}
accRows = nil
parentBatchesId = nil
firstPartitionId = uuid.Nil
return true
}
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
flush()
return
}
if len(transformationPlan) > 0 {
err := ProcessBatchWithRetries(ctx, &batch, transformationPlan, retryConfig)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
select {
case chJobErrorsOut <- *jobError:
case <-ctx.Done():
return
}
} else {
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
return
}
}
return
}
}
if batchSize <= 0 {
select {
case chBatchesOut <- batch:
wgActiveBatches.Add(1)
case <-ctx.Done():
return
}
continue
}
if len(parentBatchesId) == 0 {
firstPartitionId = batch.PartitionId
}
accRows = append(accRows, batch.Rows...)
parentBatchesId = append(parentBatchesId, batch.Id)
if len(accRows) >= batchSize {
if !flush() {
return
}
}
}
}
}

View File

@@ -1,20 +1,9 @@
package transformers package transformers
import ( import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
) )
type MssqlTransformer struct { type MssqlTransformer struct {
@@ -30,196 +19,3 @@ func NewMssqlTransformer(toStorage config.ToStorageConfig, sourceTable config.So
azureClient: azureClient, azureClient: azureClient,
} }
} }
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
func computeStorageTransformationPlan(
ctx context.Context,
azureClient *azure.Client,
toStorage config.ToStorageConfig,
sourceColumns []models.ColumnType,
sourceTable config.SourceTableInfo,
) []etl.ColumnTransformPlan {
if azureClient == nil || len(toStorage.Columns) == 0 {
return nil
}
colIndex := make(map[string]int, len(sourceColumns))
for i, col := range sourceColumns {
colIndex[strings.ToUpper(col.Name())] = i
}
var plan []etl.ColumnTransformPlan
for _, storageCol := range toStorage.Columns {
if storageCol.Mode != "REFERENCE_ONLY" {
log.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
continue
}
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
if !ok {
log.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
continue
}
sourceColName := storageCol.Source
schema := sourceTable.Schema
table := sourceTable.Table
plan = append(plan, etl.ColumnTransformPlan{
Index: idx,
Fn: func(v any) (any, error) {
if v == nil {
return nil, nil
}
b, ok := v.([]byte)
if !ok {
log.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
schema, table, sourceColName, v)
return v, nil
}
start := time.Now()
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
if err != nil {
return nil, fmt.Errorf("uploading %s.%s.%s: %w", schema, table, sourceColName, err)
}
log.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
return blobURL, nil
},
})
}
return plan
}
const processBatchCtxCheck = 4096
func (mssqlTr *MssqlTransformer) ProcessBatch(
ctx context.Context,
batch *models.Batch,
transformationPlan []etl.ColumnTransformPlan,
) error {
for i, rowValues := range batch.Rows {
if i%processBatchCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
for _, task := range transformationPlan {
val := rowValues[task.Index]
if val == nil {
continue
}
transformed, err := task.Fn(val)
if err != nil {
return err
}
rowValues[task.Index] = transformed
}
}
return nil
}
func (mssqlTr *MssqlTransformer) Exec(
ctx context.Context,
columns []models.ColumnType,
chBatchesIn <-chan models.Batch,
chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup,
) {
transformationPlan := computeTransformationPlan(columns)
storagePlan := computeStorageTransformationPlan(ctx, mssqlTr.azureClient, mssqlTr.toStorage, columns, mssqlTr.sourceTable)
transformationPlan = append(transformationPlan, storagePlan...)
for {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case batch, ok := <-chBatchesIn:
if !ok {
return
}
if len(transformationPlan) == 0 {
select {
case chBatchesOut <- batch:
wgActiveBatches.Add(1)
continue
case <-ctx.Done():
return
}
}
err := mssqlTr.ProcessBatch(ctx, &batch, transformationPlan)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
select {
case chJobErrorsOut <- custom_errors.JobError{ShouldCancelJob: true, Msg: "Transformation failed", Prev: err}:
case <-ctx.Done():
}
return
}
select {
case chBatchesOut <- batch:
case <-ctx.Done():
return
}
wgActiveBatches.Add(1)
}
}
}

View File

@@ -0,0 +1,122 @@
package transformers
import (
"context"
"fmt"
"strings"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/azure"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
func computeTransformationPlan(columns []models.ColumnType) []etl.ColumnTransformPlan {
var plan []etl.ColumnTransformPlan
for i, col := range columns {
switch col.SystemType() {
case "uniqueidentifier":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return mssqlUuidToBigEndian(b)
}
return v, nil
},
})
case "geometry", "geography":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if b, ok := v.([]byte); ok && b != nil {
return wkbToEwkbWithSrid(b, 4326)
}
return v, nil
},
})
case "datetime", "datetime2":
plan = append(plan, etl.ColumnTransformPlan{
Index: i,
Fn: func(v any) (any, error) {
if t, ok := v.(time.Time); ok {
return ensureUTC(t), nil
}
return v, nil
},
})
}
}
return plan
}
func computeStorageTransformationPlan(
ctx context.Context,
azureClient *azure.Client,
toStorage config.ToStorageConfig,
sourceColumns []models.ColumnType,
sourceTable config.SourceTableInfo,
) []etl.ColumnTransformPlan {
if azureClient == nil || len(toStorage.Columns) == 0 {
return nil
}
colIndex := make(map[string]int, len(sourceColumns))
for i, col := range sourceColumns {
colIndex[strings.ToUpper(col.Name())] = i
}
var plan []etl.ColumnTransformPlan
for _, storageCol := range toStorage.Columns {
if storageCol.Mode != "REFERENCE_ONLY" {
logrus.Warnf("to_storage: unsupported mode %q for column %s — skipping", storageCol.Mode, storageCol.Source)
continue
}
idx, ok := colIndex[strings.ToUpper(storageCol.Source)]
if !ok {
logrus.Warnf("to_storage: source column %q not found in source schema — skipping", storageCol.Source)
continue
}
sourceColName := storageCol.Source
schema := sourceTable.Schema
table := sourceTable.Table
plan = append(plan, etl.ColumnTransformPlan{
Index: idx,
Fn: func(v any) (any, error) {
if v == nil {
return nil, nil
}
b, ok := v.([]byte)
if !ok {
logrus.Warnf("to_storage: expected []byte for %s.%s.%s, got %T — passing through",
schema, table, sourceColName, v)
return v, nil
}
// start := time.Now()
blobPath := fmt.Sprintf("%s/%s/%s", schema, table, uuid.New().String())
blobURL, err := azureClient.UploadAndGetURL(ctx, blobPath, b)
if err != nil {
return nil, &custom_errors.JobError{
Msg: fmt.Sprintf("Error uploading %s.%s.%s", schema, table, sourceColName),
Prev: err,
}
}
// logrus.Debugf(`Succesfully uploaded "%s", (%vms)`, blobURL, time.Since(start).Milliseconds())
return blobURL, nil
},
})
}
return plan
}

View File

@@ -0,0 +1,73 @@
package transformers
import (
"context"
"errors"
"time"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/config"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/custom_errors"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/etl"
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
)
const processBatchCtxCheck = 4096
func ProcessBatchWithRetries(
ctx context.Context,
batch *models.Batch,
transformationPlan []etl.ColumnTransformPlan,
retryConfig config.RetryConfig,
) error {
for i, rowValues := range batch.Rows {
if i%processBatchCtxCheck == 0 {
if err := ctx.Err(); err != nil {
return err
}
}
for _, task := range transformationPlan {
val := rowValues[task.Index]
if val == nil {
continue
}
var lastErr error
success := false
for attempt := 0; attempt < retryConfig.Attempts; attempt++ {
transformed, err := task.Fn(val)
if err == nil {
rowValues[task.Index] = transformed
success = true
break
}
lastErr = err
if jobError, ok := errors.AsType[*custom_errors.JobError](err); ok {
if jobError.ShouldCancelJob {
return jobError
}
}
if attempt == retryConfig.Attempts-1 {
break
}
delay := custom_errors.ComputeBackoffDelay(
attempt,
retryConfig.BaseDelayMs,
retryConfig.MaxDelayMs,
retryConfig.MaxJitterMs,
)
time.Sleep(delay)
}
if !success {
return lastErr
}
}
}
return nil
}

View File

@@ -1 +0,0 @@
package transformers

View File

@@ -9,18 +9,6 @@ import (
"git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models" "git.ksdemosapps.com/kylesoda/go-migrate/internal/app/models"
) )
type Extractor interface {
ProcessPartition(
ctx context.Context,
tableInfo config.SourceTableInfo,
columns []models.ColumnType,
batchSize int,
partition models.Partition,
indexPrimaryKey int,
chBatchesOut chan<- models.Batch,
) (int, error)
}
type TransformerFunc func(any) (any, error) type TransformerFunc func(any) (any, error)
type ColumnTransformPlan struct { type ColumnTransformPlan struct {
@@ -29,31 +17,18 @@ type ColumnTransformPlan struct {
} }
type Transformer interface { type Transformer interface {
ProcessBatch( Consume(
ctx context.Context,
batch *models.Batch,
transformationPlan []ColumnTransformPlan,
) error
Exec(
ctx context.Context, ctx context.Context,
columns []models.ColumnType, columns []models.ColumnType,
retryConfig config.RetryConfig,
batchSize int,
chBatchesIn <-chan models.Batch, chBatchesIn <-chan models.Batch,
chBactchesOut chan<- models.Batch, chBatchesOut chan<- models.Batch,
chJobErrorsOut chan<- custom_errors.JobError, chJobErrorsOut chan<- custom_errors.JobError,
wgActiveBatches *sync.WaitGroup, wgActiveBatches *sync.WaitGroup,
) )
} }
type Loader interface {
ProcessBatch(
ctx context.Context,
tableInfo config.TargetTableInfo,
colNames []string,
batch models.Batch,
) (int64, error)
}
type TableAnalyzer interface { type TableAnalyzer interface {
QueryColumnTypes( QueryColumnTypes(
ctx context.Context, ctx context.Context,

View File

@@ -9,10 +9,11 @@ import (
type UnknownRowValues = []any type UnknownRowValues = []any
type Batch struct { type Batch struct {
Id uuid.UUID Id uuid.UUID
PartitionId uuid.UUID PartitionId uuid.UUID
Rows []UnknownRowValues ParentBatchesId []uuid.UUID
RetryCounter int Rows []UnknownRowValues
RetryCounter int
} }
type PartitionRange struct { type PartitionRange struct {

View File

@@ -12,9 +12,10 @@ import (
) )
const ( const (
totalRows int = 1_000_000 // totalRows int = 1_000_000
chunkSize int = 50_000 totalRows int = 1000
queueSize int = 4 chunkSize int = 200
queueSize int = 4
) )
func main() { func main() {
@@ -40,6 +41,14 @@ func main() {
seedManzanas(ctx, db) seedManzanas(ctx, db)
}) })
wgSeed.Go(func() {
seedPuertos(ctx, db)
})
wgSeed.Go(func() {
seedSiteHolderAttach(ctx, db)
})
wgSeed.Wait() wgSeed.Wait()
} }

View File

@@ -0,0 +1,227 @@
package main
import (
"bytes"
"context"
"database/sql"
"fmt"
"math/rand"
"sync"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
var siteHolderAttachJob = MigrationJob{
Schema: "Infraestructura",
Table: "SITE_HOLDER__ATTACH",
}
func seedSiteHolderAttach(ctx context.Context, db *sql.DB) error {
maxOid, err := getMaxGDBArchiveOidForAttach(ctx, db)
if err != nil {
log.Fatal("Error getting max GDB_ARCHIVE_OID: ", err)
}
log.Infof("Starting SITE_HOLDER__ATTACH data generation from GDB_ARCHIVE_OID: %d", maxOid+1)
rowsChan := make(chan []UnknownRowValues, queueSize)
var wgRowGenerator sync.WaitGroup
wgRowGenerator.Go(func() {
generateSiteHolderAttachRows(ctx, maxOid, totalRows, chunkSize, rowsChan)
})
columns := []string{
"GDB_ARCHIVE_OID",
"REL_GLOBALID",
"CONTENT_TYPE",
"ATT_NAME",
"DATA_SIZE",
"DATA",
"GLOBALID",
"GDB_FROM_DATE",
"GDB_TO_DATE",
"ATTACHMENTID",
}
if err := loadRowsMssql(ctx, siteHolderAttachJob, columns, db, rowsChan); err != nil {
return fmt.Errorf("Error loading rows (SITE_HOLDER__ATTACH): %w", err)
}
log.Info("Data generation and loading completed successfully (SITE_HOLDER__ATTACH)")
wgRowGenerator.Wait()
return nil
}
func getMaxGDBArchiveOidForAttach(ctx context.Context, db *sql.DB) (int, error) {
var maxOid sql.NullInt64
query := fmt.Sprintf(`
SELECT ISNULL(MAX(GDB_ARCHIVE_OID), 0)
FROM [%s].[%s]
`, siteHolderAttachJob.Schema, siteHolderAttachJob.Table)
err := db.QueryRowContext(ctx, query).Scan(&maxOid)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
if !maxOid.Valid {
return 0, nil
}
return int(maxOid.Int64), nil
}
func generateSiteHolderAttachRows(
ctx context.Context,
startOid int,
totalRows int,
chunkSize int,
out chan<- []UnknownRowValues,
) {
defer close(out)
rowsGenerated := 0
currentChunk := make([]UnknownRowValues, 0, chunkSize)
for i := range totalRows {
gdbArchiveOid := startOid + i + 1
row := generateSiteHolderAttachRow(gdbArchiveOid)
currentChunk = append(currentChunk, row)
rowsGenerated++
if len(currentChunk) == chunkSize {
select {
case out <- currentChunk:
log.Debugf("Sent SITE_HOLDER__ATTACH chunk with %d rows", len(currentChunk))
case <-ctx.Done():
log.Info("Context cancelled, stopping SITE_HOLDER__ATTACH row generation")
return
}
currentChunk = make([]UnknownRowValues, 0, chunkSize)
}
if rowsGenerated%100_000 == 0 {
logSiteHolderAttachSampleRow(rowsGenerated, row)
}
}
if len(currentChunk) > 0 {
select {
case out <- currentChunk:
log.Debugf("Sent final SITE_HOLDER__ATTACH chunk with %d rows", len(currentChunk))
case <-ctx.Done():
log.Info("Context cancelled, stopping SITE_HOLDER__ATTACH row generation")
}
}
log.Infof("Finished generating %d SITE_HOLDER__ATTACH rows", rowsGenerated)
}
func generateSiteHolderAttachRow(gdbArchiveOid int) UnknownRowValues {
dateLowerLimit, _ := time.Parse(time.RFC3339, "2020-12-31T23:59:59Z")
dateUpperLimit, _ := time.Parse(time.RFC3339, "2025-12-31T23:59:59Z")
relGlobalID, _ := uuid.New().MarshalBinary()
contentType := generateRandomContentType()
attName := generateRandomAttachmentName()
binaryData := generateRandomBinaryContent()
dataSize := len(binaryData)
globalID, _ := uuid.New().MarshalBinary()
gdbFromDate := generateRandomTimestamp(dateLowerLimit, dateUpperLimit)
gdbToDate, _ := time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
attachmentID := rand.Intn(10000) + 1
return UnknownRowValues{
gdbArchiveOid,
relGlobalID,
contentType,
attName,
dataSize,
binaryData,
globalID,
gdbFromDate,
gdbToDate,
attachmentID,
}
}
func generateRandomContentType() string {
contentTypes := []string{
"text/plain",
"application/pdf",
"image/jpeg",
"image/png",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/csv",
"application/json",
}
return contentTypes[rand.Intn(len(contentTypes))]
}
func generateRandomAttachmentName() string {
extensions := []string{".txt", ".pdf", ".jpg", ".png", ".doc", ".docx", ".csv", ".json"}
baseName := generateRandomString(20)
extension := extensions[rand.Intn(len(extensions))]
return baseName + extension
}
func generateRandomBinaryContent() []byte {
sizeOptions := []int{100, 500, 1000, 5000, 10000, 50000, 100000}
size := sizeOptions[rand.Intn(len(sizeOptions))]
var buf bytes.Buffer
lineCount := rand.Intn(size/50) + 1
for range lineCount {
line := generateRandomString(rand.Intn(80) + 20)
buf.WriteString(line)
buf.WriteString("\n")
}
for buf.Len() < size {
randomText := generateRandomString(rand.Intn(100) + 50)
buf.WriteString(randomText)
buf.WriteString("\n")
}
result := buf.Bytes()
if len(result) > size {
result = result[:size]
}
return result
}
func logSiteHolderAttachSampleRow(id int, rowValues UnknownRowValues) {
dataBytes := rowValues[5].([]byte)
log.Infof(`
Sample SITE_HOLDER__ATTACH row #%d:
GDB_ARCHIVE_OID: %v
REL_GLOBALID: [binary UUID]
CONTENT_TYPE: %v
ATT_NAME: %v
DATA_SIZE: %v
DATA: [%d bytes of binary content]
GLOBALID: [binary UUID]
GDB_FROM_DATE: %v
GDB_TO_DATE: %v
ATTACHMENTID: %v
`,
id,
rowValues[0],
rowValues[2],
rowValues[3],
rowValues[4],
len(dataBytes),
rowValues[7],
rowValues[8],
rowValues[9],
)
}