Home

Limiting concurrency in Go

When the units of work needed to process are known, and no constraints exist, a pattern such as scatter-gather can be used, where processing work is split and then the results are collected:

var words = [][]string{
	{"foo", "bar", "baz"},
	{"baz", "qux", "foo"},
	{"bar", "foo", "qux"},
}

func main() {
	// scatter
	results := make(chan string, 3)
	for i := 0; i < cap(results); i++ {
		i := i
		go func() {
			results <- findFoo(words[i])
		}()
	}

	// gather
	foos := []string{}
	for i := 0; i < cap(results); i++ {
		foos = append(foos, <-results)
	}

	fmt.Println(foos)
}

func findFoo(words []string) string {
	for _, w := range words {
		if w == "foo" {
			return w
		}
	}

	return ""
}

// Output:
// [foo foo foo]

However, sometimes high concurrency in tasks that require access to resources that can easily be saturated, like I/O or network operations, can lead to undesired results. There are some patterns to limit the number of active goroutines that can exist at the same time.

The maximum number of goroutines in this kind of operations is given by the machine’s maximum number of file descriptors, which can be checked with ulimit -n.

Worker pool

A worker pool limits the number of active goroutines by creating a queue of jobs and distributing them among different workers, which will pick them one by one.

const (
	tasks   = 10
	workers = 2
)

func main() {
	jobs := make(chan int, tasks)
	wg := sync.WaitGroup{}

	// creating a worker pool
	wg.Add(workers)
	for i := 0; i < workers; i++ {
		i := i
		go func() {
			defer wg.Done()
			for j := range jobs {
				n := generateNumber()
				fmt.Printf("worker %d processing job %d with result %d\n", i, j, n)
			}
		}()
	}

	// sending tasks to the job queue
	for i := 0; i < tasks; i++ {
		jobs <- i
	}

	close(jobs)

	wg.Wait()
}

func generateNumber() int {
	time.Sleep(time.Second)

	return rand.Intn(10)
}

// Output:
// worker 0 processing job 0 with result 1
// worker 1 processing job 1 with result 7
// worker 1 processing job 3 with result 7
// worker 0 processing job 2 with result 9
// worker 0 processing job 5 with result 1
// worker 1 processing job 4 with result 8
// worker 1 processing job 7 with result 5
// worker 0 processing job 6 with result 0
// worker 0 processing job 9 with result 6
// worker 1 processing job 8 with result 0

Semaphore

A semaphore defines a limit on how many goroutines can be active by using a buffered channel to keep track of the goroutines that are running.

const (
	tasks   = 10
	limit = 2
)

func main() {
	sem := make(chan struct{}, limit) // creating a semaphore
	wg := sync.WaitGroup{}

	wg.Add(tasks)
	for i := 0; i < tasks; i++ {
		i := i
		sem <- struct{}{} // taking a spot of the semaphore
		go func() {
			defer func() {
				wg.Done()
				<-sem // releasing the spot
			}()
			n := generateNumber()
			fmt.Printf("processing job %d with result %d\n", i, n)
		}()
	}

	wg.Wait()
}

func generateNumber() int {
	time.Sleep(time.Second)

	return rand.Intn(10)
}

// Output:
// processing job 1 with result 1
// processing job 0 with result 7
// processing job 3 with result 7
// processing job 2 with result 9
// processing job 5 with result 1
// processing job 4 with result 8
// processing job 6 with result 5
// processing job 7 with result 0
// processing job 8 with result 6
// processing job 9 with result 0