Golang的并发模式:管道和撤销

Go的并发原语可轻松构建有效利用I/O和多CPU的流水线管道。本文介绍了这种管道的示例,突出描述失败时的细微之处,并介绍了优雅地处理故障的技术。

1 什么是管道?

Go没有对于管道的正式定义,它只是并发模式中的一种。 非正式的,管道就是通过 channel 连接的一系列片段,其中每个片段是一组功能相同的 goroutine 。 在每个片段, goroutine 完成如下功能:

  • 通过 inbound channel 从上流片段接受数据
  • 在数据上执行计算,通常产生新数据
  • 把新数据通过 outbound channel 传到下流片段

每个片段都有任意数量的 inboundoutbound channel , 当然第一个和最后一个排除在外,因为前者只有 outbound channel, 后者只有 inbound channel 。 第一片段有时称为 sourceproducer ; 最后阶段,/sink/ 或 consumer

从一个简单的示例管道开始,解释思路和技巧。 之后,呈现一个更加现实的例子。

2 平方数示例

当前示例,管道包含三个片段。

第一个片段 gen , 把整数数组转换成一个可取出整数的 channelgen 函数启动 goroutine 发生数据到 channel ,发送完成后关闭 channel:

1
2
3
4
5
6
7
8
9
10
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

第二个片段 sq ,从一个 channel 中接收整数,并返回一个可取出每个接收的整数的平方的 channel 。 在 inbound channel 关闭后,此片段已将所有值发送到下游,然后关闭 outbound channel :

1
2
3
4
5
6
7
8
9
10
  func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

main 函数 设置流水线并运行最后一个片段:从第二片段接收值并打印,直到 channel 关闭:

1
2
3
4
5
6
7
8
9
 func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)

// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}

由于 sqinbound channeloutbound channel 具有相同的类型,因此可以迭代使用任意次数。 重写 main ,使其像其他片段,循环从 inbound channel 取出数据:

1
2
3
4
5
6
  func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}

3 扇出,扇入

多个函数从相同的 channel 读取,直到该 channel 关闭; 这被称为扇出。 这提供了在一组 worker 之间分配工作以并行化CPU使用和 I/O 的方式。

一个函数可以从多个输入接收数据,并进行数据处理,直到所有的输入 channel 多路复合到单个 channel 上,当所有输入都关闭时,复合的 channel 关闭。 这被称为扇入。

改变的管道流程, 运行两个 sq 的实例,每个实例从相同的输入 channel 读取(扇出)。 引入新的函数 merge 以演示扇入:

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}

merge 函数将 channel 列表转换为单个通道,为每个 inbound channel 启动一个 goroutine , 来将 inbound channel 值复制到唯一 outbound channel 。 一旦所有的输出 goroutines 都已经启动后, 再启动一个 goroutine , 待所有的 channel 发送完成后来关闭 outbound channel

往一个 closed channel 发送数据将会 panic ,所以要确保所有的发送都是在 channel 关闭之前完成的。 sync.WaitGroup 提供了一种简单的同步方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

4 稍作停顿

管道函数有如下模式:

  • 当所有发送操作完成时,片段关闭其 outbound channel
  • 片段不断接收来自 inbound channel 的值,直到这些 channel 关闭。

该模式允许每个接收片段使用 range loop , 以确保所有值都已成功发送到下游后退出所有 goroutine

但是在实际管道应用中,片段并不总是能够收到所有 inbound 值。 有时这是被设计:接收者可能只需要一个子集来进行后续处理。 更常见的情况是,由于 inbound 值显示在早期片段引入了错误,因此片段应该早早的退出。 在以上任一情况下,接收者不必等待剩余的值到达,并且我们希望较早的片段停止生成稍后片段不需要的值。

在上面的管道示例中,如果片段无法使用所有 inbound 值,则尝试发送这些值的 goroutines 将无限期地阻塞下去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)


// Consume the first value from output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
}

这是一个资源泄漏: goroutines 消耗内存和运行时资源, goroutine 栈中的堆引用使数据不被垃圾回收。 goroutines 不被垃圾收集机制回收; 它们必须自己退出。

即使下游片段没能接收所有 inbound 值,管道的上游片段也可能需要提前退出。 执行此操作的一种方法是将 outbound channel 更改为具有缓冲区。 缓冲区可以保存固定数量的值; 如果缓冲区中有空间,则发送立即完成 :

1
2
3
4
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1

在创建 channel 时,若是知道将要发送的数据量,缓冲区可以简化代码。 例如,可重写 gen 拷贝整数到 channel 的缓冲区中, 避免 goroutine 创建:

1
2
3
4
5
6
7
8
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}

回到管道中阻塞的 goroutine ,可以考虑为 merge 返回的 outbound channel 添加一个缓冲区:

1
2
3
4
5
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
}

虽然上面的代码不再阻塞,但上面的代码依赖于当前是知道将要接收到多少数据的和要往下流发送多少数据。 这样的代码不健壮,如果 gen 接收的数据多于 1 , 或者下流只接收一部分值,那么将会永久的阻塞 ~goroutine~。 固定长度的缓存不可取,相应的在代码中需要为下流片段提供一种通用的方法,来通知上流片段它们将停止接收输入。

5 明确取消

main 函数决定不再从上游片段/outbound channel/ 接收数据时, 它需要告诉上流片段的 goroutine 终止发送操作。 下面的代码演示了如何通知, 通过往称为 donechannel 上发送值来实现。 发送两个值,因为有两个被阻止的发送者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  func main() {
runtime.GOMAXPROCS(runtime.NumCPU())

in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9

// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}

fmt.Println(<-out) // 4 or 9

go func() {
time.Sleep(2*time.Second)
}()
return
}

发送 goroutines 使用 select 语句替换其发送操作,该语句在发送发生时或从 done 接收到值时继续进行。 done 值类型是空结构体,因为该值无关紧要:它只是接收事件,表示应该放弃后续发送。 输出 goroutine 在其 inbound channel 上继续循环,因此上游片段不被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(name string, c <-chan int) {
defer func(){
fmt.Println("[goroutine ruturn][", name, "]")
wg.Done()
}()
for n := range c {
fmt.Println("[IN LOOP][BEFORE select][", name, "]")
select {
case out <- n:
fmt.Println("[IN LOOP][FROM c][", name, "]")
case <-done:
fmt.Println("[IN LOOP][FROM done][", name, "]")
return
}
}

fmt.Println("[AFTER LOOP][", name, "]")
}

wg.Add(len(cs))
for index, c := range cs {
name := fmt.Sprintf("goroutine %d", index)
go output(name, c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
fmt.Println("[AFTER WAIT]")
close(out)
}()
return out
}

上面的方法有一个问题:每个下游接收者都需要知道潜在阻塞的上游发送者的数量, 并安排在提前返回时向这些发送者发信号。 跟踪这些计数是乏味和容易出错的。

其实我们需要一种方法来告诉未知的无限数量的 goroutine 停止往下游发送它们的值。 在 Go 中,可以通过关闭 channel 来执行此操作, 因为关闭 channel 上的接收操作都是立刻完成的,产生相应数据类型的零值。

这意味着 main 函数可以通过关闭 done channel 来解除所有发件人的阻塞。 这个关闭操作实际上是发送者的广播信号。 重新编排管道函数, 添加 done channel 的延迟关闭函数 , 从而使 main 函数的所有返回路径都会发出信号,以使管道片段退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 func main() {
runtime.GOMAXPROCS(runtime.NumCPU())

// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)

in := gen(done, 2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)

// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9

// done will be closed by the deferred call.
}

现在每个管道片段可以在 channel 关闭后轻松的返回, merge 中的 output routine 不用担心 inbound channel 的数据,因为 当 done channel 关闭时,上游发送者会停止数据的发送。 output 通过 defer 语句确保在所有返回路径上调用 wg.Done :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}

wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out

}

类型的, sq 可在 done 关闭后直接返回。 sq 通过 defer 语句确保在所有返回路径上关闭 out channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}

gen 大体和 sq 类型, 在 done 返回, 通过 defer 语句确保 out channel 关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}

管道构建的指导方针:

  • 当所有发送操作完成时,片段关闭其 outbound channel
  • 片段持续从 inbound channel 中接收值,直到这些 channel 关闭或发件人被取消阻塞。

管道有两种方式能解除发送者的阻塞:

  • 确保所有发送的值都有足够的缓冲区, 有足够的缓冲区就不会阻塞了。
  • 当接收方放弃从 channel 接收数据时,显式地发送信号来解除发送者的阻塞

6 MD5摘要示例

现在来看看更真实的管道应用。

MD5 是一种消息摘要算法,可用作文件校验和。 命令行实用程序 md5sum 打印文件列表的摘要值。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

示例程序像 md5sum ,但是以单个目录作为参数,并打印该目录下每个常规文件的摘要值,并按路径名排序。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

示例程序的 main 函数 调用一个辅助函数 MD5All ,它返回一个从路径名到摘要值的 map ,然后排序并打印结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}

6.1 串行版

MD5All 函数是讨论的焦点。 在 serial.go 中的实现不使用并发性,只是在遍历文件树时读取和计算校验和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}

6.2 并行版

parallel.go ,将 MD5All 函数分为两级管道。 第一级, sumFiles ,遍历树, 为每个文件做校验和创建 goroutine , 并将结果发送到 result 类型的 channel 上:

1
2
3
4
5
type result struct {
path string
sum [md5.Size]byte
err error
}

sumFiles 返回两个 channel :一个用于传递结果,另一个用于返回 filepath.Walk 返回的错误。 walk 函数启动一个新的 goroutine 来处理每个常规文件,然后检查 done 。 如果完成关闭, walk 将立即停止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
}

MD5Allchannel 中接受摘要值,返回最早出现的错误,并通过 defer 来关闭 done

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)

c, errc := sumFiles(done, root)

m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}

6.3 受限的并行

parallel.goMD5All 实现中为每个文件创建一个 goroutine 。 试想一下,若是一个目录中有许多大文件,上面的实现,很可能导致资源枯竭。 可以通过限制同时打开的问题个数来解决资源占用的问题。在 bounded.go 中, 创建固定数量的用于读取文件的 goroutine 。 重新设计流程,包含三个片段: 遍历目录树, 读取文件生成摘要,收集摘要。

第一个片段 walkFiles , 过滤出常规文件路径,同时往下游发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}

中间片段启动固定数量的 digester goroutinepaths channel 中接受文件名, 通过 c channel 来回写摘要值:

1
2
3
4
5
6
7
8
9
10
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}

与以前的示例不同,/digester/ 不会关闭其输出 channel ,因为多个 goroutine 正在共享 channel 上发送。 相反,当所有的 digester 完成时,MD5All中的代码会关闭 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error. In that case,
// MD5All does not wait for inflight read operations to complete.
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)

paths, errc := walkFiles(done, root)

// Start a fixed number of goroutines to read and digest files.
c := make(chan result) // HLc
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c) // HLc
wg.Done()
}()
}
go func() {
wg.Wait()
close(c) // HLc
}()
// End of pipeline. OMIT

m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil { // HLerrc
return nil, err
}
return m, nil
}

可以让每个 digester 创建并返回自己的输出 channel ,但是后面需要额外的 goroutine 来扇入结果。

最后片段接收 c 的所有结果,然后从 errc 检查错误。 此检查不能早于从 c 中接受数据,因为在此之前, walkFiles 可能会被下游阻塞。

7 总结

本文介绍了Go中构建管道流的技术。 处理这种管道中的故障是很棘手的,因为管道中的任一片段都可能被尝试发送下游值而阻塞,并且下游片段可能也不再关心或者需要输入数据。 本文展示了如何关闭 channel 方式来向管道启动的所有 goroutines 广播“完成”信号,并且正确地定义了管道构建的准则。

Last Updated 2017-05-23 Tue 13:28.
Render by hexo-renderer-org with Emacs 25.3.2 (Org mode 8.2.10)