Data processing with Go
Data is usually processed with pipelines, which are processes with a series of stages connected by channels, where each stage receives values from upstream via a read-only channel, performs some processing on that data, usually producing new values, and then sends those values downstream with a new read-only channel. Once done, it closes the channel to indicate the termination to consumers of that channel.
func main() {
// set up pipeline input
numbers := generate(2, 4, 8)
// first pipeline data process
squares := square(numbers)
// second pipeline data process
doubles := double(squares)
// consume pipeline output
for n := range doubles {
fmt.Println(n)
}
}
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + n
}
close(out)
}()
return out
}
// Output:
// 8
// 32
// 128
When processing large datasets one of the main challenges involves memory utilization, where if the working set size exceeds the available memory, an out-of-memory (OOM) situation can occur. To avoid it, a few techniques can be used:
- Increasing RAM resources.
- Compressing data, for example storing strings as booleans or integers.
- Data chunking, by splitting the dataset and processing in parallel.
- Indexing, by storing the dataset into logical individual files.
When dealing with data streaming, a good idea is to limit or throttle the processing rate in order to avoid quota exceeding or resource exhaustion.
func main() {
// set up pipeline input
numbers := generate()
// process data
cubics := cubic(numbers)
// consume pipeline output
for n := range cubics {
fmt.Println(n)
}
}
func generate() <-chan int {
out := make(chan int)
limit := 1 // target number of events per second we want to process
burst := 10 // maximum number of events allowed in a second
throttle := rate.NewLimiter(rate.Limit(limit), burst)
rand.Seed(time.Now().UnixNano())
go func() {
for {
err := throttle.Wait(context.Background())
if err != nil {
fmt.Printf("error throttling: %v\n", err)
return
}
out <- rand.Intn(100)
}
}()
return out
}
func cubic(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n * n
}
close(out)
}()
return out
}
Fan-in/fan-out
In order to parallelize CPU and I/O usage, the pattern fan-out can be used to distribute a load amongst a group of workers which will consume data from the same channel until that channel is closed.
Additionally, the pattern fan-in can be used to merge the output of multiple channels and process it at once.
func main() {
// set up pipeline input
numbers := generate(2, 4, 8)
// split first pipeline data process
squares1 := square(numbers)
squares2 := square(numbers)
// split second pipeline data process
doubles1 := double(squares1)
doubles2 := double(squares2)
// merge and consume pipeline output
for n := range merge(doubles1, doubles2) {
fmt.Println(n)
}
}
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func double(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + n
}
close(out)
}()
return out
}
func merge(chans ...<-chan int) <-chan int {
out := make(chan int)
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, c := range chans {
c := c
go func() {
defer wg.Done()
for n := range c {
out <- n
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Output:
// 8
// 32
// 128