From dfc76eecd057999c6f0f2eca09e827ea1dc1419a Mon Sep 17 00:00:00 2001 From: Kylesoda <249518290+kylesoda@users.noreply.github.com> Date: Mon, 6 Apr 2026 13:50:36 -0500 Subject: [PATCH] feat: implement ETL process with extractors and loaders --- scripts/test-etl/main.go | 136 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 scripts/test-etl/main.go diff --git a/scripts/test-etl/main.go b/scripts/test-etl/main.go new file mode 100644 index 0000000..8426b5a --- /dev/null +++ b/scripts/test-etl/main.go @@ -0,0 +1,136 @@ +package main + +import ( + "fmt" + "iter" + "math/rand" + "sync" + "time" +) + +const ( + NumExtractors int = 2 + NumLoaders int = 4 + ChunkSize int = 20 + totalRecords int = 500 + queueSize int = 10 + recordsPerExtractor int = totalRecords / NumExtractors +) + +type Record struct { + Id int + Data string +} + +var idCounter int = 0 + +func generateData(max int) iter.Seq[Record] { + time.Sleep(randomDurationMs(500, 1500)) + + return func(yield func(Record) bool) { + for i := range max { + record := Record{ + Id: idCounter, + Data: fmt.Sprintf("Data-%d", i), + } + idCounter++ + + if !yield(record) { + return + } + } + } +} + +func randomDurationMs(min int, max int) time.Duration { + if min < 0 { + panic(fmt.Sprintf(`Invalid negative value for "min" argument: (min=%d)`, min)) + } + + if max <= min { + panic(fmt.Sprintf(`Invalid value for "max" argument: (max=%d), "max" should be greater than "min"`, max)) + } + + return time.Duration(min+rand.Intn(max-min)) * time.Millisecond +} + +func Extractor(id int, chunkSize int, out chan<- []Record) { + chunk := make([]Record, 0, chunkSize) + + for record := range generateData(recordsPerExtractor) { + chunk = append(chunk, record) + + if len(chunk) == chunkSize { + out <- chunk + chunk = make([]Record, 0, chunkSize) + fmt.Printf("[Extractor %d] Lote enviado\n", id) + } + } + + if len(chunk) > 0 { + out <- chunk + fmt.Printf("[Extractor %d] Lote enviado (residuos=%d)\n", id, len(chunk)) + } +} + +func Transformer(id int, in <-chan []Record, out chan<- []Record) { + for records := range in { + fmt.Printf("[Transformer %d] Transformando lote de %d registros...\n", id, len(records)) + time.Sleep(randomDurationMs(50, 200)) + + for i, record := range records { + records[i].Data = record.Data + "-transformed" + } + + out <- records + } +} + +func Loader(id int, in <-chan []Record) { + for chunk := range in { + fmt.Printf("[Loader %d] Procesando lote de %d registros...\n", id, len(chunk)) + time.Sleep(randomDurationMs(100, 3000)) + } +} + +func main() { + chChunksExtract := make(chan []Record, queueSize) + chChunksTransform := make(chan []Record, queueSize) + + var wgExtractors sync.WaitGroup + for i := 1; i <= NumExtractors; i++ { + wgExtractors.Go(func() { + Extractor(i, ChunkSize, chChunksExtract) + }) + } + + go func() { + wgExtractors.Wait() + close(chChunksExtract) + fmt.Println("--- Todos los extractores terminaron. Canal cerrado (chChunksExtract). ---") + }() + + var wgTransformers sync.WaitGroup + for i := 1; i <= NumExtractors; i++ { + wgTransformers.Go(func() { + Transformer(i, chChunksExtract, chChunksTransform) + }) + } + + go func() { + wgTransformers.Wait() + close(chChunksTransform) + fmt.Println("--- Todos los transformadores terminaron. Canal cerrado (chChunksTransform). ---") + }() + + var wgLoaders sync.WaitGroup + for i := 1; i <= NumLoaders; i++ { + wgLoaders.Go(func() { + Loader(i, chChunksTransform) + }) + } + + start := time.Now() + wgLoaders.Wait() + fmt.Printf("ETL Finalizado en %v\n", time.Since(start)) +}