diff --git a/scripts/test-etl/main.go b/scripts/test-etl/main.go index 8426b5a..66561f7 100644 --- a/scripts/test-etl/main.go +++ b/scripts/test-etl/main.go @@ -9,12 +9,14 @@ import ( ) const ( - NumExtractors int = 2 - NumLoaders int = 4 - ChunkSize int = 20 - totalRecords int = 500 - queueSize int = 10 - recordsPerExtractor int = totalRecords / NumExtractors + numExtractors int = 2 + numTransformers int = numExtractors + numLoaders int = 4 + chunkSize int = 20 + totalRecords int = 1000 + extractorsQueueSize int = 10 + transformersQueueSize int = 10 + recordsPerExtractor int = totalRecords / numExtractors ) type Record struct { @@ -89,18 +91,18 @@ func Transformer(id int, in <-chan []Record, out chan<- []Record) { func Loader(id int, in <-chan []Record) { for chunk := range in { fmt.Printf("[Loader %d] Procesando lote de %d registros...\n", id, len(chunk)) - time.Sleep(randomDurationMs(100, 3000)) + time.Sleep(randomDurationMs(100, 2000)) } } func main() { - chChunksExtract := make(chan []Record, queueSize) - chChunksTransform := make(chan []Record, queueSize) + chChunksExtract := make(chan []Record, extractorsQueueSize) + chChunksTransform := make(chan []Record, transformersQueueSize) var wgExtractors sync.WaitGroup - for i := 1; i <= NumExtractors; i++ { + for i := 1; i <= numExtractors; i++ { wgExtractors.Go(func() { - Extractor(i, ChunkSize, chChunksExtract) + Extractor(i, chunkSize, chChunksExtract) }) } @@ -111,7 +113,7 @@ func main() { }() var wgTransformers sync.WaitGroup - for i := 1; i <= NumExtractors; i++ { + for i := 1; i <= numTransformers; i++ { wgTransformers.Go(func() { Transformer(i, chChunksExtract, chChunksTransform) }) @@ -124,7 +126,7 @@ func main() { }() var wgLoaders sync.WaitGroup - for i := 1; i <= NumLoaders; i++ { + for i := 1; i <= numLoaders; i++ { wgLoaders.Go(func() { Loader(i, chChunksTransform) })