feat: implement ETL process with extractors and loaders
This commit is contained in:
136
scripts/test-etl/main.go
Normal file
136
scripts/test-etl/main.go
Normal file
@@ -0,0 +1,136 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"iter"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
NumExtractors int = 2
|
||||
NumLoaders int = 4
|
||||
ChunkSize int = 20
|
||||
totalRecords int = 500
|
||||
queueSize int = 10
|
||||
recordsPerExtractor int = totalRecords / NumExtractors
|
||||
)
|
||||
|
||||
type Record struct {
|
||||
Id int
|
||||
Data string
|
||||
}
|
||||
|
||||
var idCounter int = 0
|
||||
|
||||
func generateData(max int) iter.Seq[Record] {
|
||||
time.Sleep(randomDurationMs(500, 1500))
|
||||
|
||||
return func(yield func(Record) bool) {
|
||||
for i := range max {
|
||||
record := Record{
|
||||
Id: idCounter,
|
||||
Data: fmt.Sprintf("Data-%d", i),
|
||||
}
|
||||
idCounter++
|
||||
|
||||
if !yield(record) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func randomDurationMs(min int, max int) time.Duration {
|
||||
if min < 0 {
|
||||
panic(fmt.Sprintf(`Invalid negative value for "min" argument: (min=%d)`, min))
|
||||
}
|
||||
|
||||
if max <= min {
|
||||
panic(fmt.Sprintf(`Invalid value for "max" argument: (max=%d), "max" should be greater than "min"`, max))
|
||||
}
|
||||
|
||||
return time.Duration(min+rand.Intn(max-min)) * time.Millisecond
|
||||
}
|
||||
|
||||
func Extractor(id int, chunkSize int, out chan<- []Record) {
|
||||
chunk := make([]Record, 0, chunkSize)
|
||||
|
||||
for record := range generateData(recordsPerExtractor) {
|
||||
chunk = append(chunk, record)
|
||||
|
||||
if len(chunk) == chunkSize {
|
||||
out <- chunk
|
||||
chunk = make([]Record, 0, chunkSize)
|
||||
fmt.Printf("[Extractor %d] Lote enviado\n", id)
|
||||
}
|
||||
}
|
||||
|
||||
if len(chunk) > 0 {
|
||||
out <- chunk
|
||||
fmt.Printf("[Extractor %d] Lote enviado (residuos=%d)\n", id, len(chunk))
|
||||
}
|
||||
}
|
||||
|
||||
func Transformer(id int, in <-chan []Record, out chan<- []Record) {
|
||||
for records := range in {
|
||||
fmt.Printf("[Transformer %d] Transformando lote de %d registros...\n", id, len(records))
|
||||
time.Sleep(randomDurationMs(50, 200))
|
||||
|
||||
for i, record := range records {
|
||||
records[i].Data = record.Data + "-transformed"
|
||||
}
|
||||
|
||||
out <- records
|
||||
}
|
||||
}
|
||||
|
||||
func Loader(id int, in <-chan []Record) {
|
||||
for chunk := range in {
|
||||
fmt.Printf("[Loader %d] Procesando lote de %d registros...\n", id, len(chunk))
|
||||
time.Sleep(randomDurationMs(100, 3000))
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
chChunksExtract := make(chan []Record, queueSize)
|
||||
chChunksTransform := make(chan []Record, queueSize)
|
||||
|
||||
var wgExtractors sync.WaitGroup
|
||||
for i := 1; i <= NumExtractors; i++ {
|
||||
wgExtractors.Go(func() {
|
||||
Extractor(i, ChunkSize, chChunksExtract)
|
||||
})
|
||||
}
|
||||
|
||||
go func() {
|
||||
wgExtractors.Wait()
|
||||
close(chChunksExtract)
|
||||
fmt.Println("--- Todos los extractores terminaron. Canal cerrado (chChunksExtract). ---")
|
||||
}()
|
||||
|
||||
var wgTransformers sync.WaitGroup
|
||||
for i := 1; i <= NumExtractors; i++ {
|
||||
wgTransformers.Go(func() {
|
||||
Transformer(i, chChunksExtract, chChunksTransform)
|
||||
})
|
||||
}
|
||||
|
||||
go func() {
|
||||
wgTransformers.Wait()
|
||||
close(chChunksTransform)
|
||||
fmt.Println("--- Todos los transformadores terminaron. Canal cerrado (chChunksTransform). ---")
|
||||
}()
|
||||
|
||||
var wgLoaders sync.WaitGroup
|
||||
for i := 1; i <= NumLoaders; i++ {
|
||||
wgLoaders.Go(func() {
|
||||
Loader(i, chChunksTransform)
|
||||
})
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
wgLoaders.Wait()
|
||||
fmt.Printf("ETL Finalizado en %v\n", time.Since(start))
|
||||
}
|
||||
Reference in New Issue
Block a user