Home

Data processing with Go

Data is usually processed with pipelines, which are processes with a series of stages connected by channels, where each stage receives values from upstream via a read-only channel, performs some processing on that data, usually producing new values, and then sends those values downstream with a new read-only channel. Once done, it closes the channel to indicate the termination to consumers of that channel.

func main() {
	// set up pipeline input
	numbers := generate(2, 4, 8)
	// first pipeline data process
	squares := square(numbers)
	// second pipeline data process
	doubles := double(squares)

	// consume pipeline output
	for n := range doubles {
		fmt.Println(n)
	}
}

func generate(nums ...int) <-chan int {
	out := make(chan int)

	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()

	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()

	return out
}

func double(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		for n := range in {
			out <- n + n
		}
		close(out)
	}()

	return out
}

// Output:
// 8
// 32
// 128

When processing large datasets one of the main challenges involves memory utilization, where if the working set size exceeds the available memory, an out-of-memory (OOM) situation can occur. To avoid it, a few techniques can be used:

  • Increasing RAM resources.
  • Compressing data, for example storing strings as booleans or integers.
  • Data chunking, by splitting the dataset and processing in parallel.
  • Indexing, by storing the dataset into logical individual files.

When dealing with data streaming, a good idea is to limit or throttle the processing rate in order to avoid quota exceeding or resource exhaustion.

func main() {
	// set up pipeline input
	numbers := generate()
	// process data
	cubics := cubic(numbers)

	// consume pipeline output
	for n := range cubics {
		fmt.Println(n)
	}
}

func generate() <-chan int {
	out := make(chan int)
	limit := 1 // target number of events per second we want to process
	burst := 10 // maximum number of events allowed in a second
	throttle := rate.NewLimiter(rate.Limit(limit), burst)
	rand.Seed(time.Now().UnixNano())

	go func() {
		for {
			err := throttle.Wait(context.Background())
			if err != nil {
				fmt.Printf("error throttling: %v\n", err)
				return
			}
			out <- rand.Intn(100)
		}
	}()

	return out
}

func cubic(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		for n := range in {
			out <- n * n * n
		}
		close(out)
	}()

	return out
}

Fan-in/fan-out

In order to parallelize CPU and I/O usage, the pattern fan-out can be used to distribute a load amongst a group of workers which will consume data from the same channel until that channel is closed.

Additionally, the pattern fan-in can be used to merge the output of multiple channels and process it at once.

func main() {
	// set up pipeline input
	numbers := generate(2, 4, 8)
	// split first pipeline data process
	squares1 := square(numbers)
	squares2 := square(numbers)
	// split second pipeline data process
	doubles1 := double(squares1)
	doubles2 := double(squares2)

	// merge and consume pipeline output
	for n := range merge(doubles1, doubles2) {
		fmt.Println(n)
	}
}

func generate(nums ...int) <-chan int {
	out := make(chan int)

	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()

	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()

	return out
}

func double(in <-chan int) <-chan int {
	out := make(chan int)

	go func() {
		for n := range in {
			out <- n + n
		}
		close(out)
	}()

	return out
}

func merge(chans ...<-chan int) <-chan int {
	out := make(chan int)
	wg := sync.WaitGroup{}

	wg.Add(len(chans))
	for _, c := range chans {
		c := c
		go func() {
			defer wg.Done()
			for n := range c {
				out <- n
			}
		}()
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

// Output:
// 8
// 32
// 128