Go的并发原语可轻松构建有效利用I/O和多CPU的流水线管道。本文介绍了这种管道的示例,突出描述失败时的细微之处,并介绍了优雅地处理故障的技术。
1 什么是管道?
Go没有对于管道的正式定义,它只是并发模式中的一种。
非正式的,管道就是通过 channel
连接的一系列片段,其中每个片段是一组功能相同的 goroutine
。
在每个片段, goroutine
完成如下功能:
- 通过 inbound channel 从上流片段接受数据
- 在数据上执行计算,通常产生新数据
- 把新数据通过 outbound channel 传到下流片段
每个片段都有任意数量的 inbound 和 outbound channel , 当然第一个和最后一个排除在外,因为前者只有 outbound channel, 后者只有 inbound channel 。 第一片段有时称为 source 或 producer ; 最后阶段,/sink/ 或 consumer 。
从一个简单的示例管道开始,解释思路和技巧。 之后,呈现一个更加现实的例子。
2 平方数示例
当前示例,管道包含三个片段。
第一个片段 gen , 把整数数组转换成一个可取出整数的 channel
。
gen 函数启动 goroutine
发生数据到 channel
,发送完成后关闭 channel
:
1 | func gen(nums ...int) <-chan int { |
第二个片段 sq ,从一个 channel
中接收整数,并返回一个可取出每个接收的整数的平方的 channel
。
在 inbound channel 关闭后,此片段已将所有值发送到下游,然后关闭 outbound channel :
1 | func sq(in <-chan int) <-chan int { |
main 函数 设置流水线并运行最后一个片段:从第二片段接收值并打印,直到 channel
关闭:
1 | func main() { |
由于 sq 的 inbound channel 和 outbound channel 具有相同的类型,因此可以迭代使用任意次数。 重写 main ,使其像其他片段,循环从 inbound channel 取出数据:
1 | func main() { |
3 扇出,扇入
多个函数从相同的 channel
读取,直到该 channel
关闭; 这被称为扇出。 这提供了在一组 worker 之间分配工作以并行化CPU使用和 I/O 的方式。
一个函数可以从多个输入接收数据,并进行数据处理,直到所有的输入 channel
多路复合到单个 channel
上,当所有输入都关闭时,复合的 channel
关闭。 这被称为扇入。
改变的管道流程, 运行两个 sq 的实例,每个实例从相同的输入 channel 读取(扇出)。 引入新的函数 merge 以演示扇入:
1 | func main() { |
merge 函数将 channel
列表转换为单个通道,为每个 inbound channel 启动一个 goroutine
,
来将 inbound channel 值复制到唯一 outbound channel 。
一旦所有的输出 goroutines
都已经启动后,
再启动一个 goroutine
, 待所有的 channel 发送完成后来关闭 outbound channel 。
往一个 closed channel 发送数据将会 panic ,所以要确保所有的发送都是在 channel 关闭之前完成的。
sync.WaitGroup
提供了一种简单的同步方法:
1 | func merge(cs ...<-chan int) <-chan int { |
4 稍作停顿
管道函数有如下模式:
- 当所有发送操作完成时,片段关闭其 outbound channel 。
- 片段不断接收来自 inbound channel 的值,直到这些 channel 关闭。
该模式允许每个接收片段使用 range loop ,
以确保所有值都已成功发送到下游后退出所有 goroutine
。
但是在实际管道应用中,片段并不总是能够收到所有 inbound 值。 有时这是被设计:接收者可能只需要一个子集来进行后续处理。 更常见的情况是,由于 inbound 值显示在早期片段引入了错误,因此片段应该早早的退出。 在以上任一情况下,接收者不必等待剩余的值到达,并且我们希望较早的片段停止生成稍后片段不需要的值。
在上面的管道示例中,如果片段无法使用所有 inbound 值,则尝试发送这些值的 goroutines
将无限期地阻塞下去:
1 | func main() { |
这是一个资源泄漏: goroutines
消耗内存和运行时资源, goroutine
栈中的堆引用使数据不被垃圾回收。
goroutines
不被垃圾收集机制回收; 它们必须自己退出。
即使下游片段没能接收所有 inbound 值,管道的上游片段也可能需要提前退出。 执行此操作的一种方法是将 outbound channel 更改为具有缓冲区。 缓冲区可以保存固定数量的值; 如果缓冲区中有空间,则发送立即完成 :
1 | c := make(chan int, 2) // buffer size 2 |
在创建 channel
时,若是知道将要发送的数据量,缓冲区可以简化代码。
例如,可重写 gen 拷贝整数到 channel
的缓冲区中, 避免 goroutine
创建:
1 | func gen(nums ...int) <-chan int { |
回到管道中阻塞的 goroutine
,可以考虑为 merge 返回的 outbound channel 添加一个缓冲区:
1 | func merge(cs ...<-chan int) <-chan int { |
虽然上面的代码不再阻塞,但上面的代码依赖于当前是知道将要接收到多少数据的和要往下流发送多少数据。 这样的代码不健壮,如果 gen 接收的数据多于 1 , 或者下流只接收一部分值,那么将会永久的阻塞 ~goroutine~。 固定长度的缓存不可取,相应的在代码中需要为下流片段提供一种通用的方法,来通知上流片段它们将停止接收输入。
5 明确取消
当 main 函数决定不再从上游片段/outbound channel/ 接收数据时,
它需要告诉上流片段的 goroutine
终止发送操作。
下面的代码演示了如何通知, 通过往称为 done 的 channel
上发送值来实现。
发送两个值,因为有两个被阻止的发送者:
1 | func main() { |
发送 goroutines
使用 select 语句替换其发送操作,该语句在发送发生时或从 done 接收到值时继续进行。
done 值类型是空结构体,因为该值无关紧要:它只是接收事件,表示应该放弃后续发送。
输出 goroutine
在其 inbound channel 上继续循环,因此上游片段不被阻塞。
1 | func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { |
上面的方法有一个问题:每个下游接收者都需要知道潜在阻塞的上游发送者的数量, 并安排在提前返回时向这些发送者发信号。 跟踪这些计数是乏味和容易出错的。
其实我们需要一种方法来告诉未知的无限数量的 goroutine
停止往下游发送它们的值。
在 Go 中,可以通过关闭 channel 来执行此操作,
因为关闭 channel 上的接收操作都是立刻完成的,产生相应数据类型的零值。
这意味着 main 函数可以通过关闭 done channel 来解除所有发件人的阻塞。 这个关闭操作实际上是发送者的广播信号。 重新编排管道函数, 添加 done channel 的延迟关闭函数 , 从而使 main 函数的所有返回路径都会发出信号,以使管道片段退出。
1 | func main() { |
现在每个管道片段可以在 channel
关闭后轻松的返回, merge 中的 output routine 不用担心 inbound channel 的数据,因为
当 done channel 关闭时,上游发送者会停止数据的发送。 output 通过 defer 语句确保在所有返回路径上调用 wg.Done :
1 | func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { |
类型的, sq 可在 done 关闭后直接返回。 sq 通过 defer 语句确保在所有返回路径上关闭 out channel :
1 | func sq(done <-chan struct{}, in <-chan int) <-chan int { |
gen 大体和 sq 类型, 在 done 返回, 通过 defer 语句确保 out channel 关闭:
1 | func gen(done <-chan struct{}, nums ...int) <-chan int { |
管道构建的指导方针:
- 当所有发送操作完成时,片段关闭其 outbound channel 。
- 片段持续从 inbound channel 中接收值,直到这些 channel 关闭或发件人被取消阻塞。
管道有两种方式能解除发送者的阻塞:
- 确保所有发送的值都有足够的缓冲区, 有足够的缓冲区就不会阻塞了。
- 当接收方放弃从 channel 接收数据时,显式地发送信号来解除发送者的阻塞
6 MD5摘要示例
现在来看看更真实的管道应用。
MD5
是一种消息摘要算法,可用作文件校验和。 命令行实用程序 md5sum 打印文件列表的摘要值。
% md5sum *.go d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
示例程序像 md5sum ,但是以单个目录作为参数,并打印该目录下每个常规文件的摘要值,并按路径名排序。
% go run serial.go . d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
示例程序的 main 函数 调用一个辅助函数 MD5All ,它返回一个从路径名到摘要值的 map ,然后排序并打印结果:
1 | func main() { |
6.1 串行版
MD5All 函数是讨论的焦点。 在 serial.go 中的实现不使用并发性,只是在遍历文件树时读取和计算校验和。
1 | // MD5All reads all the files in the file tree rooted at root and returns a map |
6.2 并行版
在 parallel.go ,将 MD5All 函数分为两级管道。 第一级, sumFiles ,遍历树,
为每个文件做校验和创建 goroutine
, 并将结果发送到 result 类型的 channel 上:
1 | type result struct { |
sumFiles 返回两个 channel
:一个用于传递结果,另一个用于返回 filepath.Walk 返回的错误。
walk 函数启动一个新的 goroutine
来处理每个常规文件,然后检查 done 。 如果完成关闭, walk 将立即停止:
1 | func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { |
MD5All 从 channel
中接受摘要值,返回最早出现的错误,并通过 defer 来关闭 done :
1 | func MD5All(root string) (map[string][md5.Size]byte, error) { |
6.3 受限的并行
在 parallel.go 的 MD5All 实现中为每个文件创建一个 goroutine
。
试想一下,若是一个目录中有许多大文件,上面的实现,很可能导致资源枯竭。
可以通过限制同时打开的问题个数来解决资源占用的问题。在 bounded.go 中, 创建固定数量的用于读取文件的 goroutine
。
重新设计流程,包含三个片段: 遍历目录树, 读取文件生成摘要,收集摘要。
第一个片段 walkFiles , 过滤出常规文件路径,同时往下游发送:
1 | func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { |
中间片段启动固定数量的 digester goroutine
从 paths channel
中接受文件名, 通过 c channel
来回写摘要值:
1 | func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { |
与以前的示例不同,/digester/ 不会关闭其输出 channel
,因为多个 goroutine
正在共享 channel
上发送。
相反,当所有的 digester 完成时,MD5All中的代码会关闭 channel
:
1 | // MD5All reads all the files in the file tree rooted at root and returns a map |
可以让每个 digester 创建并返回自己的输出 channel
,但是后面需要额外的 goroutine
来扇入结果。
最后片段接收 c 的所有结果,然后从 errc 检查错误。 此检查不能早于从 c 中接受数据,因为在此之前, walkFiles 可能会被下游阻塞。
7 总结
本文介绍了Go中构建管道流的技术。
处理这种管道中的故障是很棘手的,因为管道中的任一片段都可能被尝试发送下游值而阻塞,并且下游片段可能也不再关心或者需要输入数据。
本文展示了如何关闭 channel
方式来向管道启动的所有 goroutines
广播“完成”信号,并且正确地定义了管道构建的准则。
Render by hexo-renderer-org with Emacs 25.3.2 (Org mode 8.2.10)