文章目录
- 前言
- 创建模式
- 退出模式
- 分离模式
- join 模式
- notify-and-wait模式
- 退出模式的应用
- 管道模式
- 扇出与扇入模式
- 超时与取消模式
前言
在语言层面,Go针对CSP模型提供了三种并发原语。
goroutine:对应CSP模型中的P,封装了数据的处理逻辑,是Go运行时调度的基本执行单元。
channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步。
select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。
通过不同的组合方式,将产生多种不同的并发模式,灵活多变。
创建模式
在函数内部创建一个 goroutine,并返回一个 channel 类型变量的函数,这样调用方与被调用方通信就打通了。
- 创建模式是应用场景最多的一种模式,很多并发模式都是基于创建模式实现的
退出模式
有一个常驻的后台服务程序可能会对 goroutine 有着优雅退出的要求,这个时候就需要使用退出模式,退出模式有多种模式。
分离模式
这里借鉴了一些线程模型中的术语,比如分离(detached)模式。分离模式是使用最为广泛的goroutine退出模式。
对于分离的goroutine,创建它的goroutine不需要关心它的退出,这类goroutine在启动后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。
这类goroutine有两个常见用途:
- 一次性任务:顾名思义,新创建的goroutine用来执行一个简单的任务,执行后即退出。比如下面标准库中的代码:
- 常驻后台执行一些特定任务,如监视(monitor)、观察(watch)等。其实现通常采用for {…}或for { select{…} }代码段形式,并多以定时器(timer)或事件(event)驱动执行。
join 模式
在线程模型中,父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。
在Go中,我们有时候也有类似的需求:goroutine的创建者需要等待新goroutine结束。
为这样的goroutine退出模式起名为“join模式”
-
等待一个goroutine退出
package main import "time" func worker(args ...interface{}) { if len(args) == 0 { return } interval, ok := args[0].(int) if !ok { return } time.Sleep(time.Second * (time.Duration(interval))) } func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} { c := make(chan struct{}) go func() { f(args...) c <- struct{}{} }() return c } func main() { done := spawn(worker, 5) println("spawn a worker goroutine") <-done println("worker done") }
spawn函数使用典型的goroutine创建模式创建了一个goroutine,main goroutine作为创建者通过spawn函数返回的channel与新goroutine建立联系,这个channel的用途就是在两个goroutine之间建立退出事件的“信号”通信机制。
main goroutine在创建完新goroutine后便在该channel上阻塞等待,直到新goroutine退出前向该channel发送了一个信号。 -
获取goroutine的退出状态
如果新goroutine的创建者不仅要等待goroutine的退出,还要精准获取其结束状态,同样可以通过自定义类型的channel来实现这一场景需求。下面是基于上面的代码改造后的示例:
package main import ( "errors" "fmt" "time" ) var OK = errors.New("ok") func worker(args ...interface{}) error { if len(args) == 0 { return errors.New("invalid args") } interval, ok := args[0].(int) if !ok { return errors.New("invalid interval arg") } time.Sleep(time.Second * (time.Duration(interval))) return OK } func spawn(f func(args ...interface{}) error, args ...interface{}) chan error { c := make(chan error) go func() { c <- f(args...) }() return c } func main() { done := spawn(worker, 5) println("spawn worker1") err := <-done fmt.Println("worker1 done:", err) done = spawn(worker) println("spawn worker2") err = <-done fmt.Println("worker2 done:", err) }
-
等待多个goroutine退出
通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出的模式:package main import ( "fmt" "sync" "time" ) func worker(args ...interface{}) { if len(args) == 0 { return } interval, ok := args[0].(int) if !ok { return } time.Sleep(time.Second * (time.Duration(interval))) } func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} { c := make(chan struct{}) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func(i int) { name := fmt.Sprintf("worker-%d:", i) f(args...) println(name, "done") wg.Done() // worker done! }(i) } go func() { wg.Wait() c <- struct{}{} }() return c } func main() { done := spawnGroup(5, worker, 3) println("spawn a group of workers") <-done println("group workers done") }
-
支持超时机制的等待
通过一个定时器(time.Timer)设置了超时等待时间,并通过select原语同时监听timer.C和done这两个channel,哪个先返回数据就执行哪个case分支:package main import ( "fmt" "sync" "time" ) func worker(args ...interface{}) { if len(args) == 0 { return } interval, ok := args[0].(int) if !ok { return } time.Sleep(time.Second * (time.Duration(interval))) } func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} { c := make(chan struct{}) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func(i int) { name := fmt.Sprintf("worker-%d:", i) f(args...) println(name, "done") wg.Done() // worker done! }(i) } go func() { wg.Wait() c <- struct{}{} }() return c } func main() { done := spawnGroup(5, worker, 30) println("spawn a group of workers") timer := time.NewTimer(time.Second * 5) defer timer.Stop() select { case <-timer.C: println("wait group workers exit timeout!") case <-done: println("group workers done") } }
notify-and-wait模式
在前面的几个场景中,goroutine的创建者都是在被动地等待着新goroutine的退出。
但很多时候,goroutine创建者需要主动通知那些新goroutine退出,尤其是当main goroutine作为创建者时。main goroutine退出意味着Go程序的终止,而粗暴地直接让main goroutine退出的方式可能会导致业务数据损坏、不完整或丢失。
我们可以通过notify-and-wait(通知并等待)模式来满足这一场景的要求。虽然这一模式也不能完全避免损失,但是它给了各个goroutine一个挽救数据的机会,从而尽可能减少损失。
-
通知并等待一个goroutine退出
通过一个双向 channel,同时向 goroutine 发送退出退出信号,并接收 goroutine 的退出状态:package main import "time" func worker(j int) { time.Sleep(time.Second * (time.Duration(j))) } func spawn(f func(int)) chan string { quit := make(chan string) go func() { var job chan int // 模拟job channel for { select { case j := <-job: f(j) case <-quit: quit <- "ok" return } } }() return quit } func main() { quit := spawn(worker) println("spawn a worker goroutine") time.Sleep(5 * time.Second) // notify the child goroutine to exit println("notify the worker to exit...") quit <- "exit" timer := time.NewTimer(time.Second * 10) defer timer.Stop() select { case status := <-quit: println("worker done:", status) case <-timer.C: println("wait worker exit timeout") } }
-
通知并等待多个goroutine退出
Go语言的channel有一个特性是,当使用close函数关闭channel时,所有阻塞到该channel上的goroutine都会得到通知。我们就利用这一特性实现满足这一场景的模式:
package main import ( "fmt" "sync" "time" ) func worker(j int) { time.Sleep(time.Second * (time.Duration(j))) } func spawnGroup(n int, f func(int)) chan struct{} { quit := make(chan struct{}) job := make(chan int) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func(i int) { defer wg.Done() // 保证wg.Done在goroutine退出前被执行 name := fmt.Sprintf("worker-%d:", i) for { j, ok := <-job if !ok { println(name, "done") return } // do the job worker(j) } }(i) } go func() { <-quit close(job) // 广播给所有新goroutine wg.Wait() quit <- struct{}{} }() return quit } func main() { quit := spawnGroup(5, worker) println("spawn a group of workers") time.Sleep(5 * time.Second) // notify the worker goroutine group to exit println("notify the worker group to exit...") quit <- struct{}{} timer := time.NewTimer(time.Second * 5) defer timer.Stop() select { case <-timer.C: println("wait group workers exit timeout!") case <-quit: println("group workers done") } }
此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过“comma ok”模式获取的ok值为false,也就表明该channel已经被关闭,于是worker goroutine执行退出逻辑(退出前wg.Done()被执行)。
退出模式的应用
很多时候,我们在程序中要启动多个goroutine协作完成应用的业务逻辑,比如:
但这些goroutine的运行形态很可能不同,有的扮演服务端,有的扮演客户端,等等,因此似乎很难用一种统一的框架全面管理它们的启动、运行和退出。我们尝试将问题范围缩小,聚焦在实现一个“超时等待退出”框架,以统一解决各种运行形态goroutine的优雅退出问题。
我们来定义一个接口:
这样,凡是实现了该接口的类型均可在程序退出时得到退出的通知和调用,从而有机会做退出前的最后清理工作。这里还提供了一个类似http.HandlerFunc的类型ShutdownerFunc,用于将普通函数转化为实现了GracefullyShutdowner接口的类型实例(得益于函数在Go中为“一等公民”的特质):
一组goroutine的退出
总体上有两种情况:
-
一种是并发退出,在这类退出方式下,各个goroutine的退出先后次序对数据处理无影响,因此各个goroutine可以并发执行退出逻辑;
- 为每个传入的GracefullyShutdowner接口实现的实例启动一个goroutine来执行退出逻辑,并将timeout参数传入每个实例的Shutdown方法中;
- 通过sync.WaitGroup在外层等待每个goroutine的退出
- 通过select监听一个退出通知channel和一个timer channel,决定到底是正常退出还是超时退出
-
另一种则是串行退出,即各个goroutine之间的退出是按照一定次序逐个进行的,次序若错了可能会导致程序的状态混乱和错误。
- 串行退出有个问题是waitTimeout值的确定,因为这个超时时间是所有goroutine的退出时间之和。
- 在上述代码里,将每次的left(剩余时间)传入下一个要执行的goroutine的Shutdown方法中。
- select同样使用这个left作为timeout的值(通过timer.Reset重新设置timer定时器周期)
完整代码
package main import ( "errors" "sync" "time" ) type GracefullyShutdowner interface { Shutdown(waitTimeout time.Duration) error } type ShutdownerFunc func(time.Duration) error func (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error { return f(waitTimeout) } func ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error { c := make(chan struct{}) go func() { var wg sync.WaitGroup for _, g := range shutdowners { wg.Add(1) go func(shutdowner GracefullyShutdowner) { defer wg.Done() shutdowner.Shutdown(waitTimeout) }(g) } wg.Wait() c <- struct{}{} }() timer := time.NewTimer(waitTimeout) defer timer.Stop() select { case <-c: return nil case <-timer.C: return errors.New("wait timeout") } } func SequentialShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error { start := time.Now() var left time.Duration timer := time.NewTimer(waitTimeout) for _, g := range shutdowners { elapsed := time.Since(start) left = waitTimeout - elapsed c := make(chan struct{}) go func(shutdowner GracefullyShutdowner) { shutdowner.Shutdown(left) c <- struct{}{} }(g) timer.Reset(left) select { case <-c: //continue case <-timer.C: return errors.New("wait timeout") } } return nil }
管道模式
每个数据处理环节都由一组功能相同的goroutine完成。在每个数据处理环节,goroutine都要从数据输入channel获取前一个环节生产的数据,然后对这些数据进行处理,并将处理后的结果数据通过数据输出channel发往下一个环节。
package main
func newNumGenerator(start, count int) <-chan int {
c := make(chan int)
go func() {
for i := start; i < start+count; i++ {
c <- i
}
close(c)
}()
return c
}
func filterOdd(in int) (int, bool) {
if in%2 != 0 {
return 0, false
}
return in, true
}
func square(in int) (int, bool) {
return in * in, true
}
func spawn(f func(int) (int, bool), in <-chan int) <-chan int {
out := make(chan int)
go func() {
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
}()
return out
}
func main() {
in := newNumGenerator(1, 20)
// 流水线:过滤偶数 -》求平方
out := spawn(square, spawn(filterOdd, in))
for v := range out {
println(v)
}
}
扇出与扇入模式
扇出模式
- 多个功能相同的goroutine从同一个channel读取数据并处理,直到该channel关闭,这种情况被称为“扇出”
- 使用扇出模式可以在一组goroutine中均衡分配工作量,从而更均衡地利用CPU。
扇入模式
- 把所有输入channel的数据汇聚到一个统一的输入channel,然后处理程序再从这个channel中读取数据并处理,直到该channel因所有输入channel关闭而关闭。
package main
import (
"fmt"
"sync"
"time"
)
func newNumGenerator(start, count int) <-chan int {
c := make(chan int)
go func() {
for i := start; i < start+count; i++ {
c <- i
}
close(c)
}()
return c
}
func filterOdd(in int) (int, bool) {
if in%2 != 0 {
return 0, false
}
return in, true
}
func square(in int) (int, bool) {
return in * in, true
}
func spawnGroup(name string, num int, f func(int) (int, bool), in <-chan int) <-chan int {
groupOut := make(chan int)
var outSlice []chan int
for i := 0; i < num; i++ {
out := make(chan int)
go func(i int) {
name := fmt.Sprintf("%s-%d:", name, i)
fmt.Printf("%s begin to work...\n", name)
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
fmt.Printf("%s work done\n", name)
}(i)
outSlice = append(outSlice, out)
}
// Fan-in
//
// out --\
// \
// out ---- --> groupOut
// /
// out --/
//
go func() {
var wg sync.WaitGroup
for _, out := range outSlice {
wg.Add(1)
go func(out <-chan int) {
for v := range out {
groupOut <- v
}
wg.Done()
}(out)
}
wg.Wait()
close(groupOut)
}()
return groupOut
}
func main() {
in := newNumGenerator(1, 20)
out := spawnGroup("square", 2, square, spawnGroup("filterOdd", 3, filterOdd, in))
time.Sleep(3 * time.Second)
for v := range out {
fmt.Println(v)
}
}
超时与取消模式
编写一个从气象数据服务中心获取气象信息的客户端。该客户端每次会并发向三个气象数据服务中心发起数据查询请求,并以最快返回的那个响应信息作为此次请求的应答返回值。
要求在 500 ms 内返回响应结果,否则关闭请求。
package main
import (
"context"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"time"
)
type result struct {
value string
}
func first(servers ...*httptest.Server) (result, error) {
c := make(chan result)
// 使用 context,并且将 ctx 传进 goroutine 的请求中,保证 first 退出时,
// goroutine 同时退出,释放资源
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queryFunc := func(i int, server *httptest.Server) {
url := server.URL
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf("query goroutine-%d: http NewRequest error: %s\n", i, err)
return
}
req = req.WithContext(ctx)
log.Printf("query goroutine-%d: send request...\n", i)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("query goroutine-%d: get return error: %s\n", i, err)
return
}
log.Printf("query goroutine-%d: get response\n", i)
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
c <- result{
value: string(body),
}
return
}
// 启动多个 goroutine 同时请求数据
for i, serv := range servers {
go queryFunc(i, serv)
}
select {
case r := <-c: // 获取响应最快的结果
return r, nil
case <-time.After(500 * time.Millisecond): // 500 ms 内没有返回数据超时退出
return result{}, errors.New("timeout")
}
}
// 模拟请求
func fakeWeatherServer(name string, interval int) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("%s receive a http request\n", name)
// 模拟延时
time.Sleep(time.Duration(interval) * time.Millisecond)
w.Write([]byte(name + ":ok"))
}))
}
func main() {
result, err := first(fakeWeatherServer("open-weather-1", 200),
fakeWeatherServer("open-weather-2", 1000),
fakeWeatherServer("open-weather-3", 600))
if err != nil {
log.Println("invoke first error:", err)
return
}
fmt.Println(result)
time.Sleep(10 * time.Second)
}
每日一图,赏心悦目