目录
【sync.Cond】
【atomic原子性】
【sync.Once】
使用sync.Once实现单例模式
在 上一篇文章 中分析了Go语言sync 包中 sync.Mutex、sync.RWMutex和sync.WaitGroup的用法,这篇文章继续来讨论下sync包中关于 sync.Cond 、atomic原子性 和 sync.Once 的用法。
【sync.Cond】
sync.Cond表示条件变量,把一个条件变量理解为一个容器,这个容器中存放着一个或一组等待着某个条件成立的 Goroutine;当条件成立后,这些处于等待状态的 Goroutine 将被通知并被唤醒继续后续的工作。如果没有条件变量,开发人员可能需要在 Goroutine 中通过连续轮询的方式检查某条件是否为真,这种连续轮询非常消耗资源,因为 Goroutine 在这个过程中是处于活动状态的,但它的工作又没有进展。
举个例子:这里的条件变量可以理解为微信发消息的推送通知提醒,如果没有新消息的提醒,你需要隔一会儿去打开微信看看有没有回复,有可能看了十几次依然没有回复,浪费时间和精力。但是有了消息提醒,你就能第一时间知道来新消息了,然后点进去接收消息。这里的推送提醒就相当于条件变量,在goroutine中就是共享资源的状态产生变化的时候起到通知的作用。
条件变量可以用在唤醒一个或者所有的等待者做一些检查操作的时候,比如说在一个限定容量的队列中,当队列已满或者数据为空的时候,可以利用这种 等待/通知 机制实现阻塞或者唤醒。
条件变量需要有一个前置条件,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。
sync.Cond 源码:
// $GOROOT/src/sync/cond.go
type Cond
func NeWCond(l Locker) *Cond //Cond 关联的 Locker 实例可以通过 c.L 访问,它内部维护着一个先入先出的等待队列
func (c *Cond) Wait() //把调用者放入Cond等待队列中并阻塞,直到被Signal或者Broadcast的方法从等待队列中移除并唤醒(要求必须持有 c.L 的锁)
func (c *Cond) Signal() //如果Cond等待队列中有一个或者多个等待的goroutine,则从等待队列中移除第一个goroutine并把它唤醒(不要求持有 c.L 的锁)
func (c *Cond) Broadcast() //如果Cond等待队列中有一个或者多个等待的goroutine,则清空所有等待的 goroutine,并全部唤醒(不要求持有 c.L 的锁)
// 只能利用sync.NewCond函数创建它的指针值。这个函数需要一个sync.Locker类型的参数值
// $GOROOT/src/sync/mutex.go
type Locker interface {
Lock()
Unlock()
}
条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用,条件变量提供的方法有三个:等待通知(wait)、单发通知(signal)、广播通知(broadcast)。使用条件变量等待通知的时候需要处于互斥锁锁定的情况,单发通知或广播通知的时候需要处于互斥锁解锁的情况。当共享资源的状态发生变化时,条件变量可以被用来通知被互斥锁阻塞的线程。条件变量的最大优势就是在效率方面的提升,当共享资源的状态不满足条件的时候,想操作它的线程再也不用循环往复地做检查了,只要等待通知就好了。
再举个例子:会议前每个人需要到现场签到,每个人到场的时间不确定,只要有人来签到,就要 调用 Broadcast或者Signal方法 通知(也就是唤醒)主持人并做一次登记(一次性来两个人也是只登记一次),登记的同时主持人需要判断是否所有参会人都已经到场,如果全部到场则宣布“会议开始”,否则继续等待。调用 Broadcast或者Signal方法的时候没有使用锁,只是在更改等待条件(到场人+1)的时候才使用到了锁。对应的代码示例如下:
//basic/go03/sync2/cond1.go
package main
import (
"log"
"math/rand"
"sync"
"time"
)
func test1() {
c := sync.NewCond(&sync.Mutex{})
var readyCount int
count := 5
for i := 1; i <= count; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second) //随机等待时长,模拟不定时有人到场
c.L.Lock()
readyCount++
c.L.Unlock()
log.Printf("第 %d 个人已进入会场\n", i)
c.Broadcast()
}(i)
}
c.L.Lock()
for readyCount != count {
c.Wait()
log.Println("主持人已登记")
}
c.L.Unlock()
log.Println("所有人都已进入会场,会议开始")
}
func main() {
test1()
}
由于使用随机时长模拟了不同时间间隔到场签到的人,因此上面第一次运行表示每个人单独到场,第二次出现了有两个人一起到场的情况。
注意,调用 cond.Wait 方法之前一定要加锁。如果在调用Wait的时候把前后的 Lock 或者 Unlock 漏掉了,将会引发严重错误:fatal error: sync: unlock of unlocked mutex
问题的原因在于 cond.Wait 需要把当前调用者加入到 notify 队列之中然后释放锁并一直等待,如果不释放锁,其他 Wait 的调用者就没有机会加入到 notify 队列中;等调用者被唤醒之后又会去争抢这把锁;如果调用 Wait 之前不加锁的话就有可能 Unlock 一个未加锁的 Locker。
还有一个注意点,就是判断到场人数的的时候需要使用 for 循环而不是if判断一次,否则会出现判断失误的情况:
sync.Cond 细节总结:
- 如果一个 goroutine 因收到通知而被唤醒,但却发现共享资源的状态依然不符合它的要求,那么就应该再次调用条件变量的Wait方法并继续等待下次通知的到来(循环);
- 条件变量的Signal方法和Broadcast方法都是被用来发送通知的,Signal只会唤醒一个等待的 goroutine,Broadcast会唤醒所有等待的 goroutine;
- 条件变量的通知具有即时性,如果发送通知的时候没有 goroutine 在等待,那么该通知就会被直接丢弃,在这之后才开始等待的 goroutine 只可能被后面的通知唤醒。
sync.Cond 其实不是很好驾驭,原因有下面几点:
- 有时候需要加锁,有时候可以不加;
- Wait 唤醒后需要检查条件;
- 条件变量的更改需要原子操作或者互斥锁保护
【atomic原子性】
使用 sync.Mutex和sync.RWMutex可以实现互斥锁,使用互斥锁虽然可以保证临界区中代码的串行执行,但却不能保证这些代码执行的原子性(atomicity),因为Go语言的调度器会频繁地切换这些 goroutine的运行状态,就像在 Go语言的并发:goroutine和channel_浮尘笔记的博客-CSDN博客 这篇文章开头描述并发的逻辑:并发指的是一个时间段中有几个程序(线程/协程)都处于已启动运行到运行完毕之间,但任一个时刻点上只有一个程序在处理机上运行。因此任何两条语句执行的间隙或者是同一条语句执行的过程中都有可能导致协程被中断,从而不满足原子性。
比如执行 c:=a+b 这条语句,转换成机器指令分为三步,这 3 条指令在执行过程中是可以被中断的,因为有可能这个操作不是由一个 CPU 指令来实现的:
- LOAD:将变量从内存加载到 CPU 寄存器;
- ADD:执行加法指令;
- STORE:将结果存储回原内存地址中
原子操作的指令是不可以被中断的,就像是数据库的事务一样,要么不执行要么全部执行。因此,原子操作也可以被用于共享数据的并发同步。
Go语言中关于原子性相关的函数在 标准库 sync/atomic中,在 $GOROOT/src/sync/atomic/doc.go 中可以看到针对 int32、int64、uint32、uint64、uintptr、Pointer 这些数据类型的函数支持(Pointer不支持Add方法):
使用 atomic 的方法可以实现更底层的优化,如果使用 Mutex 虽然也可以解决问题,但是实现逻辑比较复杂,对性能会有一定影响。
比如需要记录一个状态值0或者1,可以使用Mutex/RWMutex加锁实现互斥,保证同一时刻只有一个goroutine拿到了正确的状态值;但其实仔细想想这个过程是不涉及到对资源的竞争的,只需要一个原子性的标记而已,因此可以改为使用atomic原子操作做:使用uint32类型的变量来记录这里的0和1。原子操作函数的执行速度要比互斥锁快得多。
sync/atomic中提供的原子操作有以下这些:
- Add(加法):func AddInt32(addr *int32, delta int32) (new int32),就是给第一个参数地址中的值增加一个 delta 值;
- CAS(比较并交换,compare and swap):func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool),比较当前 addr 地址里的值如果不等于 old 就返回 false;如果等于 old 就把此地址的值替换成 new 值并返回 true;
- Swap(交换):func SwapInt32(addr *int32, new int32) (old int32),如果不需要比较旧值直接替换就可以使用 Swap 方法;
- Load(加载):func LoadInt32(addr *int32) (val int32),即使在多处理器、多核、有 CPU cache 的情况下,也能原子操作的取出 addr 地址中的值;
- Store(存储):func StoreInt32(addr *int32, val int32),即使在多处理器、多核、有 CPU cache 的情况下,也能原子操作的把一个值存储到 addr 地址中;
- Value类型:原型操作的存储任意类型的值,但不能 CAS 和 Swap,常常用在配置变更等场景中。
//basic/go03/sync2/atomic.go
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
type Config struct { //定义一个配置信息的结构体
dbtype string
ip string
time string
}
func loadNewConfig() Config {
return Config{
dbtype: "mysql",
ip: "127.0.0.1",
time: time.Now().Format("2006-01-02 15:04:05"),
}
}
func main() {
var config atomic.Value
config.Store(loadNewConfig())
var cond = sync.NewCond(&sync.Mutex{})
go func() { // 启动一个 goroutine 用来变更配置信息
for {
time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second) //休眠随机时长
config.Store(loadNewConfig()) // 更新配置
cond.Broadcast() // 通知等待着配置已变更
}
}()
go func() { // 启动一个 goroutine 加载最新的配置信息
for {
cond.L.Lock()
cond.Wait() // 等待变更信号
c := config.Load().(Config) // 读取新的配置
fmt.Printf("最新配置: %+v\n", c)
cond.L.Unlock()
}
}()
select {}
}
【问】为什么上面定义的方法中都是 *int32,而不是 int32?
【答】因为原子操作需要的是被操作值的指针,而不是这个值本身;如果传入的是int32那么它的值会被复制,与原来的值就没有关系了。
总结 atomic 包的使用细节:
- atomic 包更适合一些对性能十分敏感、并发量较大且读多写少的场合。如果要想保证原子操作,尽可能的要使用 atomic 提供的方法。
- atomic 操作的对象是一个地址,需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法。
【sync.Once】
sync.Once 可以用来执行且仅仅执行一次动作,常常用于单例对象的初始化场景。Once类型使用互斥锁和原子操作实现了功能,sync.Once 只暴露了一个方法 Do,可以多次调用 Do 方法,但是只有第一次调用 Do 方法时 f 参数才会执行,这里的 f 是一个无参数无返回值的函数。
//$GOROOT/src/sync/once.go
type Once struct {
done uint32 //记录Do方法被调用的次数,只会是0或者1,一旦Do方法的首次调用完成,它的值就会从0变为1
m Mutex //互斥锁
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
看一个简单的例子:
//basic/go03/sync2/once.go
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
// 第一个初始化函数
f1 := func() {
fmt.Println("第1次输出")
}
once.Do(f1) //第1次输出
// 第二个初始化函数
f2 := func() {
fmt.Println("第2次输出")
}
once.Do(f2) //无输出
}
使用sync.Once实现单例模式
在 PHP或者Java中,单例模式是指一个类只允许创建一个对象(或者实例),单例模式可以保证类的对象全局唯一,可以参考之前我写的关于PHP中使用单例模式的例子:PHP设计模式之单例模式_php实现单例模式_浮尘笔记的博客-CSDN博客
在Go语言中实现一个单例模式就更简单了,因为不需要手动判断实例是否已存在,使用sync.Once就可以。代码如下:
//basic/go03/sync2/once2.go
package main
import (
"fmt"
"sync"
"unsafe"
)
type Singleton struct {
data string
}
var singleInstance *Singleton
var once sync.Once
func GetSingletonInstance() *Singleton {
once.Do(func() {
fmt.Println("创建实例")
singleInstance = new(Singleton)
})
return singleInstance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
obj := GetSingletonInstance()
//循环中每次输出的实例的地址都是相同的,而且上面的"创建实例"只会输出一次
fmt.Println(unsafe.Pointer(obj))
wg.Done()
}()
}
wg.Wait()
}
使用 sync.Once细节:
由于Do方法只会在 f 函数执行结束之后才会把done字段的值变为1,如果 f 函数需要很长时间才能执行结束 或者 根本就不会结束(比如守护任务),那么就有可能导致相关 goroutine同时阻塞在锁定该Once值的互斥锁m的那行代码上。
Do方法在 f 函数执行结束后,使用了defer语句对done字段赋值,因此不论 f 函数是否执行成功或者引发panic,最终done字段的值都会变为1,那么也就无法使用同一个Once值重新执行了。
源代码:https://gitee.com/rxbook/go-demo-2023/tree/master/basic/go03/sync2