This article is the result of me taking notes and trying out basic concurrency concepts presented by Rob Pike in his talk available here: YT video by Rob Pike.
Spawning a goroutine
When you just want to run a goroutine, it does not make the caller wait. The program behaves as if you just spawn a shell command with & at the end.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package main
import (
"fmt"
"math/rand"
)
func idea(text string) {
fmt.Printf("Idea #%d: %s", rand.Int31n(1000), text)
}
func main() {
go idea("Where is the spring?")
}
|
We could wait some time for the goroutine to finish:
1
2
3
4
|
func main() {
go idea("Where is the spring?")
time.Sleep(2 * time.Second)
}
|
…but there is a better way.
Channels
Here I will create a signalling channel which will notify the main program when the goroutine completes.
1
2
3
4
5
6
7
8
9
10
|
func idea(text string, end chan bool) {
fmt.Printf("Idea #%d: %s\n", rand.Int31n(1000), text)
end <- true
}
func main() {
sig := make(chan bool)
go idea("Where is the spring?", sig)
<-sig
}
|
Generator pattern
There is a pattern - called generator pattern - where you return a channel which can be used to communicate with a spawned goroutine:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func idea(text string) chan bool {
sig := make(chan bool)
go func() {
fmt.Printf("Idea #%d: %s\n", rand.Int31n(1000), text)
sig <- true
}()
return sig
}
func main() {
sig := idea("Where is the spring?")
<-sig
}
|
Receive-only channel
Here the function idea returns a channel on which a caller main can receive data:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func idea(topic string) <-chan string {
c := make(chan string)
// this routine forever generates ideas and sends them to c
go func() {
for i := 0; ; i++ {
c <- fmt.Sprintf("Idea #%d: %s\n", i, topic)
time.Sleep(time.Duration(200))
}
}()
return c
}
func main() {
spring := idea("Spring activity")
// we need just 5
for range 5 {
fmt.Printf("> %s", <-spring)
}
}
|
Fan-in
This pattern allows to gather several channels into a single channel by
- creating a new target channel
- for each channel, spawning a routine that inifinitely copies values to a target channel
- reading values from target
Values read from target channel are not coupled to any of the source channels
in a way that they don’t wait for each other: if one is faster, it will generate
more messages to the target channel in the unit of time.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func fanIn(inp1, inp2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
c <- <-inp1
}
}()
go func() {
for {
c <- <-inp2
}
}()
return c
}
func main() {
springChan := idea("Spring activity")
winterChan := idea("Winter play")
joined := fanIn(springChan, winterChan)
// we need just 5
for range 100 {
fmt.Printf("> %s", <-joined)
}
}
|
Each source waits for go ahead
Let’s say we want to keep the sequencing, i.e. need to make routines wait for each other.
We need to send a special wait channel which will be part of a message. The message will not be a string anymore, but a struct with both string value and a wait channel.
Interestingly, all messages will share the same wait chan.
1
2
3
4
|
type Message struct {
value string
wait chan bool
}
|
Now, we let only single message be processed from each of faned-out source channels until its wait chan is notified we can process more.
- when a message from a source channel is sent to a fan-out, it immediatelly reads wait chan
- messages can be created and added to other source channels (possibly with this blocked wait chan)
- wait chan will be unblocked when messages from all other sources are processed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func idea(topic string, wait int) <-chan Message {
c := make(chan Message)
waitchan := make(chan bool)
// this routine forever generates ideas and sends them to c
go func() {
for i := 0; ; i++ {
c <- Message{fmt.Sprintf("%s - %d\n", topic, i), waitchan}
time.Sleep(time.Duration(wait))
<-waitchan
}
}()
return c
}
func withFanIn(cnt int, springDelay int, winterDelay int) {
{
spring := idea("Spring activity", springDelay)
winter := idea("Winter activity", winterDelay)
fan := fanIn(spring, winter)
for range cnt {
m1 := <-fan
fmt.Printf("> %s", m1.value)
m2 := <-fan
fmt.Printf("> %s", m2.value)
m1.wait <- true
m2.wait <- true
}
}
}
func main() {
withFanIn(10, 200, 1000)
}
|
The above solution is a bit tricky: it requires putting a channel into each message as a means to synchronize reading and writing of messages to preserve ordering (or better: sequencing) of messages.
Select
Rules:
- all channels are evaluated
- selection blicks until one communication can proceed
- it proceeds; if many can, the one to proceed is selected randomly
- if default clause exists, it executes immediately if no channels are ready
- ** cases are comminication** - either a send or a receive
fan-in pattern using select
With select, only one goroutine is needed:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func fanIn[T comparable](inp1, inp2 <-chan T) <-chan T {
c := make(chan T)
go func() {
for {
select {
case s := <-inp1:
c <- s
case s := <-inp2:
c <- s
}
}
}()
return c
}
|
Timeout using select
time.After function returns a channel that blocks for a specified duration. After this interval, the channel delivers current time, once.
Below, we create spring messages after a random duration less than 5s, however, we don’t wait for any of the messages for longer than 2s:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
type Message struct {
value string
}
func idea(topic string, randMs int) <-chan Message {
c := make(chan Message)
// this routine forever generates ideas and sends them to c
go func() {
for i := 0; ; i++ {
sleeps := time.Duration(rand.Intn(randMs)) * time.Millisecond
fmt.Printf("Will think for %v [ms]...", sleeps)
time.Sleep(sleeps)
c <- Message{fmt.Sprintf("%s - %d [ms %s]\n", topic, i, sleeps)}
}
}()
return c
}
func waitWithTimeout() {
// we generate each spring idea within random time of less than 5 s ...
spring := idea("Spring activity", 5000)
for {
select {
case s := <-spring:
fmt.Printf("> %s", s.value)
case <-time.After(2000 * time.Millisecond):
// ... and if the thinking takes longer than 2 s, we quit
fmt.Println("too slow!")
return
}
}
}
func main() {
waitWithTimeout()
}
|
We can also contrain whole idea generation process to take no longer than 5s:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func waitWithTimeout() {
// we generate each spring idea within random time of less than 5 s ...
spring := idea("Spring activity", 5000)
// ... and we don't let the whole generation take more than 5s
timeout := time.After(5000 * time.Millisecond)
for {
select {
case <-time.Tick(1 * time.Second):
fmt.Println("[Second passed]")
case s := <-spring:
fmt.Printf("> %s", s.value)
case <-timeout:
fmt.Println("Timeout!")
return
}
}
}
|
Resources