Go Concurrency - GoRoutines, Worker Pools and Throttling Made Simple

GoRoutines, WaitGroups, Workers.

Go’s concurrency is simple and very powerful, and here’s the main use cases and how to use them.

Setting the field

For the sake of these examples, here’s the structs we’ll be using:

type Job struct {
    Id int
}

type JobResult struct {
    Output string
}

Examples

Launch and forget

By far the easiest form of concurrency, the launch-and-forget is very easy in Go.

You must simply prepend the function you want to run asynchronously with go, and that’s all.

i.e.

go doSomething()

Alternatively, you can wrap a code of block with go func() { and }():

go func() {
    // do multiple things
}()

Keep in mind that if your program finishes before you goroutine, your goroutine may be cut short:

func main() {
    go func() {
        time.Sleep(time.Second * 3)
        println("World") // <------ this will never execute because the program will have already exited!
    }()
    println("Hello")
}

If you want to prevent that from happening, you might have to resort to a WaitGroup (I’ll talk about this later) or a channel to tell the program that everything is done:

func main() {
    finished := make(chan bool)
    go func() {
        time.Sleep(time.Second * 3)
        println("World")
        finished <- true
    }()
    println("Hello")
    <- finished
}

If you’re curious about what finished := make(chan bool), finished <- true and <- finished does:

finished := make(chan bool) creates a channel that expects booleans as data type, finished <- true sends the data true to the channel finished and <- finished waits for data to be sent to the channel finished.

Note that the result does not change whether you send true or false. In fact, you could’ve even used string instead of bool. The crucial part is that the program will wait for the channel to receive something before moving on.

Technically, wg.Wait() is just like having one finished channel for each job.

In fact, channels are very powerful. For instance, the following:

func main() {
    worldChannel := make(chan string)
    dearChannel := make(chan string)
    go func() {
        time.Sleep(time.Second * 3)
        worldChannel <- "world"
    }()
    go func() {
        time.Sleep(time.Second * 2)
        dearChannel <- "dear"
    }()
    println("Hello", <- dearChannel, <- worldChannel)
}

would output Hello dear world in ~3 seconds, because the highest denominator here is the first goroutine that takes 3 seconds to execute, as opposed to the second goroutine, which, by the time the first goroutine finishes, the first one will already have sent its data through its own channel.

Anyways, I digress.

Launch and return

In order to return the result of multiple jobs, we need to make sure to wait that all jobs are done before returning.

To do that, we’ll use sync.WaitGroup, which will allow us to add the number of jobs that needs to be completed to the WaitGroup and keep track of when all jobs are completed.

Furthermore, because unlike the previous examples, we need to keep track of the results, this means that we’d be risking making concurrent changes to a slice, which would cause problems. To prevent that, we’ll use sync.RWMutex, which will allow us to prevent concurrent changes. Note that there any many alternatives to this, but this is the easiest method.

Because we want to return the result of each job, our function signature will look something like this:

func launchAndReturn(jobs []Job) []JobResult {

We’ll instantiate our slice of results, RWMutex as well as WaitGroup:

    var (
        results      []JobResult
        resultsMutex = sync.RWMutex{}
        wg           sync.WaitGroup
    )

Add the number of jobs in the WaitGroup

    wg.Add(len(jobs))

And now that we’ve set everything up, we can iterate over each job, and start a new goroutine for each entry.

    for _, job := range jobs {
        go func(job Job) {

To make sure that the job is marked as done at the end of each goroutines, we’ll defer wg.Done() right now. This is important, because if wg.Done() is not called as often as len(jobs) from the earlier wg.Add(len(jobs)), the WaitGroup will hang forever later on (we’ll get to this).

            defer wg.Done()

Finally, we can execute the job

            jobResult := doSomething(job)

Now that we’ve executed the job asynchronously, we have to store the result. First, we need to lock the RWMutex, second, append the result, and finally, unlock the mutex

            resultsMutex.Lock()
            results = append(results, jobResult)
            resultsMutex.Unlock()

And now we can close our goroutine

        }(job)
    }

The last step is to wait for the WaitGroup to complete by using wg.Wait(), and then we can return the results.

    wg.Wait()
    return results
}

That’s all! This will concurrently execute each jobs, and wait for all of them to complete. Here’s the final result:

package main

import (
    "fmt"
    "sync"
)

type Job struct {
    Id int
}

type JobResult struct {
    Output string
}

func main() {
    var jobs []Job
    for i := 0; i < 5; i++ {
        jobs = append(jobs, Job{Id: i})
    }
    jobResults := launchAndReturn(jobs)
    fmt.Println(jobResults)
}

func launchAndReturn(jobs []Job) []JobResult {
    var (
        results      []JobResult
        resultsMutex = sync.RWMutex{}
        wg           sync.WaitGroup
    )
    wg.Add(len(jobs))

    for _, job := range jobs {
        go func(job Job) {
            defer wg.Done()
            jobResult := doSomething(job)
            resultsMutex.Lock()
            results = append(results, jobResult)
            resultsMutex.Unlock()
        }(job)
    }

    wg.Wait()
    return results
}

func doSomething(job Job) JobResult {
    fmt.Printf("Running job #%d\n", job.Id)
    return JobResult{Output: "Success"}
}

Output:

Running job #0
Running job #4
Running job #2
Running job #3
Running job #1
[{Success} {Success} {Success} {Success} {Success}]

Launch, throttle and forget

There’s two kinds of throttling: maximum number of concurrent workers, and number of jobs execute per unit of time.

Because another example on throttling concurrency will be introduced in the next section, I’ll take care of the former case here: maximum number of concurrent workers.

There are a lot of ways to throttle, so I’ll be mentioning my favorite, which also happens to be the easiest and the simplest. It involves having each workers on a different goroutine, and having each workers listen to the same channel for new jobs.

As I just mentioned the worker (which is just a function) will be launched on a different goroutine, and will listen to the channel for new jobs available:

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job) {
    defer wg.Done()
    for job := range jobChannel {
        doSomething(id, job)
    }
}

You might be wondering, “Why is there a for loop on a channel when it’s not even a slice?”

Iterating over a channel means you’re listening to the channel, and this listening will continue until the channel is closed. Note that multiple workers will be listening to the same channel, but not to worry since Go channels is made for the producer-consumer problem.

We’ll start by creating some fake jobs.

func main() {
    var jobs []Job
    for i := 0; i < 100; i++ {
        jobs = append(jobs, Job{Id: i})
    }

Now, we’ll create our WaitGroup and we’ll create a constant with the desired number of workers

    const NumberOfWorkers = 10
    var wg sync.WaitGroup

Because each worker will run on a different goroutine, we also need to set our WaitGroup to wait for our number of workers, as opposed to the previous use case which required us to pass the total number of jobs instead.

    wg.Add(NumberOfWorkers)
    jobChannel := make(chan Job)

Now that our channel is ready, we can start the workers. For now, they’ll just sit here and wait, since we haven’t sent the jobs to the channel yet.

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel)
    }

We send the jobs to the channel here, which is being read on by our workers. Note that a channel is like a pointer to a queue, meaning that two workers who are reading from the same queue won’t get the same jobs.

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

Since we already sent each jobs to the channel, we can close the channel and wait for the workers to finish.

    close(jobChannel)
    wg.Wait()
}

Final result:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    Id int
}

type JobResult struct {
    Output string
}

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job) {
    defer wg.Done()
    for job := range jobChannel {
        doSomething(id, job)
    }
}

func main() {
    start := time.Now()
    var jobs []Job
    for i := 0; i < 100; i++ {
        jobs = append(jobs, Job{Id: i})
    }

    const NumberOfWorkers = 10
    var (
        wg               sync.WaitGroup
        jobChannel       = make(chan Job)
    )
    wg.Add(NumberOfWorkers)

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel)
    }

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

    close(jobChannel)
    wg.Wait()
    fmt.Printf("Took %s\n", time.Since(start))
}

func doSomething(workerId int, job Job) JobResult {
    fmt.Printf("Worker #%d Running job #%d\n", workerId, job.Id)
    time.Sleep(1 * time.Second)
    return JobResult{Output: "Success"}
}

Launch, throttle and return

This use case is a mix of both previous sections, but you won’t need a RWMutex, because we’ll be reading the results synchronously.

Unlike before, our worker function now has a new parameter called jobResultChannel where we’ll send our results.

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job, jobResultChannel chan JobResult) {
    defer wg.Done()
    for job := range jobChannel {
        resultChannel <- doSomething(id, job)
    }
}

Nothing new here.

func main() {
    var jobs []Job
    for i := 0; i < 100; i++ {
        jobs = append(jobs, Job{Id: i})
    }
    const NumberOfWorkers = 10

Unlike previously, we’ll create another channel for the output as well as a slice to store our results.

    
    var (
        wg               sync.WaitGroup
        jobChannel       = make(chan Job)
        jobResultChannel = make(chan JobResult, len(jobs))
        jobResults       []JobResult
    )

    wg.Add(NumberOfWorkers)

Start the workers and send the jobs to the channel which is being read by the workers.

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel, jobResultChannel)
    }

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

Now that we’ve already sent all jobs to the channel, we can close it. Likewise, since the workers also took care of sending the result to the jobResultChannel, we can close it too.

    close(jobChannel)
    wg.Wait()
    close(jobResultChannel)

Read all JobResults from the channel (this is synchronous), and then do whatever you want with the results.

    // Receive job results from workers
    for result := range jobResultChannel {
        jobResults = append(jobResults, result)
    }

    fmt.Println(jobResults)
}

Final result:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    Id int
}

type JobResult struct {
    Output string
}

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job, resultChannel chan JobResult) {
    defer wg.Done()
    for job := range jobChannel {
        resultChannel <- doSomething(id, job)
    }
}

func main() {
    start := time.Now()
    var jobs []Job
    for i := 0; i < 100; i++ {
        jobs = append(jobs, Job{Id: i})
    }

    const NumberOfWorkers = 10
    var wg sync.WaitGroup

    wg.Add(NumberOfWorkers)
    jobChannel := make(chan Job)
    jobResultChannel := make(chan JobResult, len(jobs))

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel, jobResultChannel)
    }

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

    close(jobChannel)
    wg.Wait()
    close(jobResultChannel)

    var jobResults []JobResult
    // Receive job results from workers
    for result := range jobResultChannel {
        jobResults = append(jobResults, result)
    }

    fmt.Println(jobResults)
    fmt.Printf("Took %s", time.Since(start))
}

func doSomething(workerId int, job Job) JobResult {
    fmt.Printf("Worker #%d Running job #%d\n", workerId, job.Id)
    time.Sleep(500 * time.Millisecond)
    return JobResult{Output: "Success"}
}

Time-based throttling

TODO

This concludes the article on Go concurrency.