简单介绍go中的并发编程. 涉及内容主要为goroutine, goroutine间的通信(主要是channel), 并发控制(等待、退出).
想查看更多与Go相关的内容, 可以查看我的Go编程栏目
Goroutine
语法
在一个函数调用前加上go即可, go func()
. 语法很简单, 可以说是并发写起来最简单的程序语言了.
goroutine与线程
开始可能会把goroutine当做线程来看, 在我们这的计算密集型任务中, 确实可以认为和线程差不多, 但在I/O比较多的任务中, 就能看到作为协程的一面了. 在go中, goroutine数与线程数可以是m对n的关系, 即m个goroutine运行在n个线程上, 可以认为一个线程能调度执行多个goroutine, 线程内部调度goroutine比线程间的切换调度开销小很多, 这也是协程的优势. 和python那样的协程比起来, goroutine除了能通过阻塞、系统调用让出线程之外, 还能被调度(抢占式调度), 避免一些goroutine执行时间过长, 导致其他goroutine饥饿.
G-M-P模型动态演示
不过在计算密集型的任务中协程并没有什么优势, 要计算的任务量是固定的, 过多的协程调度反而降低效率. 所以在我们这写代码的时候, 一般是把goroutine当作线程来用的, 根据cpu核数来创建goroutine, 这要根据具体的任务类型来考虑.
通信
从创建goroutine的语法可以看到, 并没有一个对应函数返回值的方法. 如果想在创建goroutine的协程中获取返回值需要进行goroutine间的通信, 常用的为channel, 和基于共享变量的通信. 通信的用途很广.
闭包
一个函数和其词法环境的引用绑定在一起, 是一个闭包.
func closure() func() int {
tmp := 1
return func() int {
tmp++
return tmp
}
}
func main() {
test1 := closure()
test2 := closure()
test1() // 2
test1() // 3
test2() // 2
}
其中tmp
本来是closure
函数中的一个局部变量, 但是closure
的返回值是一个闭包函数, 其中引用了tmp
, 那tmp
就不能随着closure
的结束而销毁, 会逃逸到堆上. 有点像创建了一个对象, 对象中有个成员变量tmp
, 成员方法执行时会引用该变量.
go的闭包用着也挺方便的, 不过局部变量逃逸到堆上也会引起一些额外开销, 本来在栈上创建变量, 随着栈销毁, 变量也自动销毁, 但如果逃逸到堆上就需要通过gc来回收. 除了闭包也会有其他一些情况引起逃逸, 如使用了interface{}动态类型, 栈空间不足等.
闭包也容易引起一些问题, 在闭包中引用的变量, 可以认为是使用了它的引用(指针), 这样就容易引发一些错误.
func main() {
s := []int{1, 2, 3, 4}
for i, elem := range s {
go func() {
fmt.Println(elem) // 引用的都是elem的地址
}()
}
}
func runTime() {
start := time.Now()
defer fmt.Println(time.Since(start)) // 0
defer func() {
fmt.Println(time.Since(start)) // 预期的时间
}()
...
}
基于共享变量的通信
和其他编程语言类似, 可以通过加锁的方式来比较安全地对变量进行并发方法. sync.Mutex、sync.RWMutex
, 要注意的是锁被创建之后就不能拷贝了, 要传递锁(作为参数等)只能传引用, 这和go的实现有关, 要传引用也可以理解, 要保证大家用的是同一把锁, 才能起到控制访问的功能.
基于Channel
Channel是go中推荐使用的通信方式, 一个channel可以认为是一个线程安全的消息队列, 先进先出.
-
语法
Go Channel详解
-
一些特殊情况
- 向已经关闭的channel或为nil的channel中写, 会引发panic
- 从为nil的channel中读, 会永久阻塞
- 从已经关闭的channel中读, 如果channel内已经没有数据了, 会返回相应零值, 可以用
elem, ok := <-ch
, 使用ok
来判断获取的值是不是有效值.
-
非阻塞式收发
正常使用channel进行数据的收发都是阻塞式的, 如果channel缓存已满, 再往里写就会阻塞, 如果channe中没有数据, 尝试读的话也会引起阻塞. 要实现非阻塞式的channel访问, 使用select.select
是go中一个特殊语法, 看起来和switch有点像.
select {
case ele := <-readCh:
case e -> writeCh:
case <-checkCh:
default:
...
}
select
语句的效果是看各个case
的channel操作是否可以完成(不会被阻塞), 如果有, 从所有可以执行的case
中随机选一个执行, 如果没有看有没有default
语句, 有的话执行defalut
语句, 如果还是没有的话挂起, 等待可执行条件.
::: warning
一些特殊情况:
- 空的
select
语句, 也就是select{}
会使当前goroutine直接挂起, 永远无法被唤醒 - 只有一个
case
, 和直接使用channel效果是一样的 - 从已关闭的channel中读, 是直接可执行的
:::
可以简单了解一下select
语句的实现, 一些特殊情况会单独处理, 常规逻辑是这样的:
- 以一定顺序锁定所有
case
中的channel, 再根据随机生成的轮询顺序, 遍历各个case
查找是否有可以立即执行的case
, 有的话选定对应的case
执行, 解锁各channel - 如果没有可以立即执行的
case
, 也没有default
, 将当前goroutine加入到所有相关channel的收发队列中, 将自己挂起 - 当该goroutine再次被唤醒时, 再锁定各个
case
, 如此循环
并发控制
退出
一个goroutine不能直接停止另外一个goroutine, 如果可以的话可能会导致goroutine之间的共享变量落在未定义的状态上, 所以只能让goroutine自己退出.
- 利用
select
和被关闭的channel的性质, 能实现简单的退出
control := make(chan struct{})
inData := make(chan int, 2)
go func() {
forTag:
for {
select{
case <- control:
for data := range inData {
// do something
}
// 退出
break forTag
case data := <- inData:
// do something
}
}
}()
inData <- 1
inData <- 2
time.Sleep(time.Second)
close(control)
- 使用
Context
Context
在本质上和上面的做法是类似的, 通过关闭channel来进行消息传递, 不过做了些封装, 使用更方便一些.
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
go handle(ctx, 1500*time.Millisecond)
select {
case <-ctx.Done():
fmt.Println("main", ctx.Err())
}
}
func handle(ctx context.Context, duration time.Duration) {
select {
case <-ctx.Done():
fmt.Println("handle", ctx.Err())
case <-time.After(duration):
fmt.Println("process request with", duration)
}
}
等待
很多时候一个goroutine要等待其他一些goroutine结束之后再执行后续流程, 比如两个任务有前后依赖关系, 可以利用channel的阻塞进行等待.
- goroutine结束之后发送完成信号
workerNum := 10
finishCh := make(chan struct{}, workerNum)
worker := func() {
// do something
finish <- struct{}{}
}
for i := 0; i < workerNum; i++ {
go worker()
}
// 等待
for i := 0; i < workerNum; i++ {
<-finish
}
- 利用
sync.WaitGroup
(类似信号量)
workerNum := 10
wg := &sync.WaitGroup{}
// 要稍微注意一下Add和Done的位置
wg.Add(workerNum)
worker := func() {
// do something
wg.Done()
}
for i := 0; i < workerNum; i++ {
go worker()
}
// 等待
wg.Wait()
- 用上面提到的
select
语句
系统中的一些应用
- map-reduce
- 几个任务流顺序执行
- 递归中的并发数控制
性能分析工具
- benchmark基准测试
- pprof
参考资料
- 《Go语言圣经》
- 《Go语言高级编程》
- 《Go语言设计与实现》
- 《Go语言高性能编程》