跳转至

Go Concurrency

Goroutines

A goroutine is a lightweight thread managed by the Go runtime.

Go
1
go f(x, y, z)

starts a new goroutine running

Go
1
f(x, y, z)

The evaluation of f, x, y, and z happens in the current goroutine and the execution of f happens in the new goroutine.

Goroutines run in the same address space, so access to shared memory must be synchronized. The sync package provides useful primitives, although you won't need them much in Go as there are other primitives. (See the next slide.)

Example

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

Output

Bash
1
2
3
4
5
6
7
8
9
hello
world
world
hello
hello
world
world
hello
hello

main 函数

  • 使用 go 关键字以 goroutine 的方式调用 say("world")
  • 直接调用 say("hello")

当运行这个程序时,会发生以下情况

  1. main 函数启动一个新的 goroutine 来执行say("world")`
  2. 不等待 say("world") 完成,main 函数立即继续执行,调用 say("hello")
  3. 此时,两个 say 函数几乎同时开始执行:
    • 一个在后台打印 "world"
    • 另一个在主线程打印 "hello"

Channels

Channels are a typed conduit through which you can send and receive values with the channel operator, <-.

Go
1
2
3
ch <- v    // Send v to channel ch.
v := <-ch  // Receive from ch, and
           // assign value to v.

(The data flows in the direction of the arrow)

Like maps and slices, channels must be created before use:

Go
1
ch := make(chan int)

By default, sends and receives block until the other side is ready. This allows goroutines to synchronize without explicit locks or condition variables.

Example

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "fmt"

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // send sum to c
}

func main() {
    s := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c // receive from c

    fmt.Println(x, y, x+y)
}

Output

Bash
1
2
3
4
5
-5 17 12

or 

17 -5 12

sum 函数

  • 接受一个整数切片 s 和一个整数类型的 channel c 作为参数
  • 计算切片中所有整数的和
  • 将计算结果发送到 channel c

main 函数

  • 定义一个整数切片 s
  • 创建一个整数类型的 channel c
  • 启动两个 goroutine,分别计算切片的前半部分和后半部分的和
  • 从 channel 中接收两个结果
  • 打印接收到的结果及其总和
Warning

main 函数执行到 x, y := <-c, <-c,开始等待从 channel 接收两个值

这一行会阻塞,直到两个 goroutine 都完成计算并将结果发送到 channel

两个 sum goroutine 并发执行:

  • 第一个计算 7 + 2 + 8 = 17,并将 17 发送到 channel
  • 第二个计算 -9 + 4 + 0 = -5,并将 -5 发送到 channel

main 函数从 channel 接收两个值,分别赋给 x 和 y

注意:接收顺序可能是不确定的,取决于哪个 goroutine 先完成计算

Buffered Channels

Channels can be buffered. Provide the buffer length as the second argument to make to initialize a buffered channel:

Go
1
ch := make(chan int, 100)

Sends to a buffered channel block only when the buffer is full. Receives block when the buffer is empty.

Modify the example to overfill the buffer and see what happens.

Channel FIFO

FIFO 原则:channel 遵循先进先出(First In, First Out)的原则

like a <queue>

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package main

import "fmt"

func main() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

Output

Bash
1
2
1
2

Range and Close

  • A sender can close a channel to indicate that no more values will be sent.
  • Receivers can test whether a channel has been closed by assigning a second parameter to the receive expression: after
Go
1
v, ok := <-ch

ok is false if there are no more values to receive and the channel is closed.

The loop for i := range c receives values from the channel repeatedly until it is closed, if no close, it will run permanently and then raise errors.

Close a channel

Only the sender should close a channel, never the receiver. Sending on a closed channel will cause a panic.

When to close a channel

Channels aren't like files; you don't usually need to close them. Closing is only necessary when the receiver must be told there are no more values coming, such as to terminate a range loop.

Example

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
    "fmt"
)

func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x+y
    }
    close(c)
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    for i := range c {
        fmt.Println(i)
    }
}

Output

Bash
1
...

在这个斐波那契数列生成程序中,close(c) 的作用是关闭channel,它有以下几个重要意义:

  1. 信号发送完成 close(c) 表示fibonacci函数已经完成了所有数据的发送[1][5]。这是一个常见的模式,用于通知接收方(这里是main函数中的for循环)不会再有更多的数据发送了。

  2. 允许range循环结束 在main函数中,for i := range c 会不断从channel中接收值,直到channel被关闭[4][6]。如果不关闭channel,这个循环将永远不会结束,可能导致程序死锁。

  3. 防止panic 关闭channel后,继续向其发送数据会导致panic[3]。通过在发送完所有数据后立即关闭channel,可以防止意外地向已完成的channel发送数据。

  4. 资源管理 虽然Go的垃圾回收器会处理未关闭的channel,但显式关闭channel是一个好习惯,特别是在更复杂的程序中[5]。

  5. 允许接收方检测channel状态 接收方可以通过检查接收操作的第二个返回值来判断channel是否已关闭[3][6]。例如:

    Go
    1
    2
    3
    4
    v, ok := <-c
    if !ok {
        // channel is close :)
    }
    

Select

The select statement lets a goroutine wait on multiple communication operations.

A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.

Select语句在Go中是一个非常重要的并发控制结构,主要用于处理多个通道(channel)操作

  1. 等待多个通道操作: Select可以同时等待多个通道的发送或接收操作,直到其中一个操作就绪.

  2. 非阻塞IO: Select允许在多个通道上执行非阻塞的IO操作,这对于并发编程非常有用.

  3. 随机选择: 当多个通道同时就绪时,Select会随机选择一个可执行的case.

  4. 阻塞和非阻塞模式

    • 如果没有default分支,Select会阻塞直到某个通道操作就绪.
    • 如果有default分支,Select会在没有通道就绪时立即执行default分支.
  5. 多路复用: Select实现了类似于IO多路复用的功能,可以同时监控多个通道的状态.

  6. 避免死锁: 通过使用Select,可以避免因等待单个通道而导致的潜在死锁情况.

  7. 超时处理: 结合time.After(),Select可以实现超时控制.

  8. 语法结构Select 的语法类似于 switch,但每个case必须是一个通道操作(发送或接收).

示例用法:

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
select {
case <-ch1:
    // 处理从ch1接收到的数据
case data := <-ch2:
    // 处理从ch2接收到的数据
case ch3 <- data:
    // 向ch3发送数据
default:
    // 当没有通道就绪时执行
}

Example

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

Output

Bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
0
1
1
2
3
5
8
13
21
34
quit
  1. main 函数开始执行:

    • 创建两个无缓冲 channel:cquit
    • 启动一个匿名 goroutine
  2. 匿名 goroutine 开始执行:

    • 准备接收 10 个斐波那契数
  3. main 函数调用 fibonacci:

    • fibonacci 函数开始执行,进入无限循环(直到后面quit传递信号,才自动return
  4. fibonacci 函数循环:

    • 使用 select 语句等待 channel 操作
  5. 生成和发送斐波那契数:

    • select 选择 case c <- x
    • 发送当前斐波那契数到 channel c
    • 更新 xy 以准备下一个数
  6. 匿名 goroutine 接收和打印:

    • 从 channel c 接收数字并打印
    • 这个过程重复 10 次
  7. 发送退出信号:

    • 匿名 goroutine 接收完 10 个数后,向 quit channel 发送信号
  8. fibonacci 函数接收退出信号:

    • select 语句检测到 quit channel 有数据可读
    • 执行 case <-quit,打印 "quit" 并返回
  9. 程序结束:

    • fibonacci 函数返回
    • main 函数结束,程序退出

Default Selection

The default case in a select is run if no other case is ready.

Use a default case to try a send or receive without blocking:

Go
1
2
3
4
5
6
select {
case i := <-c:
    // use i
default:
    // receiving from c would block
}

Example

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(100 * time.Millisecond)
    boom := time.After(500 * time.Millisecond)
    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(50 * time.Millisecond)
        }
    }
}

Output

Bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
BOOM!

Equivalent Binary Trees

task

Code

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import (
    "golang.org/x/tour/tree"
    "fmt"
)

// Walk walks the tree t sending all values
// from the tree to the channel ch.
func Walk(t *tree.Tree, ch chan int) {
    var walker func(t *tree.Tree)

    walker = func(t *tree.Tree) { // define a func and pass it to 'walker'
        if t == nil {
            return
        }
        walker(t.Left) // left child tree
        ch <- t.Value // current root
        walker(t.Right) // right child tree
    }
    walker(t) // calling walker func
    close(ch)
}

// Same determines whether the trees
// t1 and t2 contain the same values.
func Same(t1, t2 *tree.Tree) bool {
    ch1, ch2 := make(chan int), make(chan int)
    go Walk(t1, ch1)
    go Walk(t2, ch2)
    // hence we get full ch1 and ch2
    for {
        v1, ok1 := <-ch1
        v2, ok2 := <-ch2
        if ok1 != ok2 || v1 != v2 {
            return false
        }
        if !ok1 && !ok2 {
            return true
        }
    }
}

func main() {
    ch := make(chan int)
    go Walk(tree.New(1), ch)
    for i := 0; i < 10; i++ {
        fmt.Printf("%d ", <-ch)
    }
    fmt.Println()

    fmt.Println(Same(tree.New(1), tree.New(1)))
    fmt.Println(Same(tree.New(1), tree.New(2)))
}

Output

Bash
1
2
3
1 2 3 4 5 6 7 8 9 10 
true
false

sync.Mutex

We've seen how channels are great for communication among goroutines.

But what if we don't need communication? What if we just want to make sure only one goroutine can access a variable at a time to avoid conflicts?

This concept is called mutual exclusion, and the conventional name for the data structure that provides it is mutex.

Go's standard library provides mutual exclusion with sync.Mutex and its two methods:

Go
1
2
Lock
Unlock

We can define a block of code to be executed in mutual exclusion by surrounding it with a call to Lock and Unlock as shown on the Inc method.

We can also use defer to ensure the mutex will be unlocked as in the Value method.

互斥锁(Mutex)

Review

Readers who have studied databases will be familiar with this :)

Example

Go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "fmt"
    "sync"
    "time"
)

// SafeCounter is safe to use concurrently.
type SafeCounter struct {
    // each object itself has a mutex and mapping
    mu sync.Mutex // mutex
    v  map[string]int // mapping
}

// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    c.v[key]++
    c.mu.Unlock()
}

// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    defer c.mu.Unlock() // use defer to ensure will unlock the mutex (func returns), even panic
    return c.v[key]
}

func main() {
    c := SafeCounter{v: make(map[string]int)}
    for i := 0; i < 1000; i++ {
        go c.Inc("somekey")
    }

    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey"))
}
Review: Defer
  • defer语句会将函数推迟到周围函数返回之前执行
  • 多个defer语句按LIFO(后进先出)顺序执行

Output

Bash
1
1000