package main import ( "fmt" "iter" "math/rand" "sync" "time" ) const ( NumExtractors int = 2 NumLoaders int = 4 ChunkSize int = 20 totalRecords int = 500 queueSize int = 10 recordsPerExtractor int = totalRecords / NumExtractors ) type Record struct { Id int Data string } var idCounter int = 0 func generateData(max int) iter.Seq[Record] { time.Sleep(randomDurationMs(500, 1500)) return func(yield func(Record) bool) { for i := range max { record := Record{ Id: idCounter, Data: fmt.Sprintf("Data-%d", i), } idCounter++ if !yield(record) { return } } } } func randomDurationMs(min int, max int) time.Duration { if min < 0 { panic(fmt.Sprintf(`Invalid negative value for "min" argument: (min=%d)`, min)) } if max <= min { panic(fmt.Sprintf(`Invalid value for "max" argument: (max=%d), "max" should be greater than "min"`, max)) } return time.Duration(min+rand.Intn(max-min)) * time.Millisecond } func Extractor(id int, chunkSize int, out chan<- []Record) { chunk := make([]Record, 0, chunkSize) for record := range generateData(recordsPerExtractor) { chunk = append(chunk, record) if len(chunk) == chunkSize { out <- chunk chunk = make([]Record, 0, chunkSize) fmt.Printf("[Extractor %d] Lote enviado\n", id) } } if len(chunk) > 0 { out <- chunk fmt.Printf("[Extractor %d] Lote enviado (residuos=%d)\n", id, len(chunk)) } } func Transformer(id int, in <-chan []Record, out chan<- []Record) { for records := range in { fmt.Printf("[Transformer %d] Transformando lote de %d registros...\n", id, len(records)) time.Sleep(randomDurationMs(50, 200)) for i, record := range records { records[i].Data = record.Data + "-transformed" } out <- records } } func Loader(id int, in <-chan []Record) { for chunk := range in { fmt.Printf("[Loader %d] Procesando lote de %d registros...\n", id, len(chunk)) time.Sleep(randomDurationMs(100, 3000)) } } func main() { chChunksExtract := make(chan []Record, queueSize) chChunksTransform := make(chan []Record, queueSize) var wgExtractors sync.WaitGroup for i := 1; i <= NumExtractors; i++ { wgExtractors.Go(func() { Extractor(i, ChunkSize, chChunksExtract) }) } go func() { wgExtractors.Wait() close(chChunksExtract) fmt.Println("--- Todos los extractores terminaron. Canal cerrado (chChunksExtract). ---") }() var wgTransformers sync.WaitGroup for i := 1; i <= NumExtractors; i++ { wgTransformers.Go(func() { Transformer(i, chChunksExtract, chChunksTransform) }) } go func() { wgTransformers.Wait() close(chChunksTransform) fmt.Println("--- Todos los transformadores terminaron. Canal cerrado (chChunksTransform). ---") }() var wgLoaders sync.WaitGroup for i := 1; i <= NumLoaders; i++ { wgLoaders.Go(func() { Loader(i, chChunksTransform) }) } start := time.Now() wgLoaders.Wait() fmt.Printf("ETL Finalizado en %v\n", time.Since(start)) }