WaitGroup
我们可以通过 sync.WaitGroup
将原本顺序执行的代码在多个 Goroutine 中并发执行,加快程序处理的速度。
我们来看一下sync.WaitGroup
的结构体:
type WaitGroup struct {
//保证WaitGroup不会被开发者通过再赋值的方式复制
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers only guarantee that 64-bit fields are 32-bit aligned.
// For this reason on 32 bit architectures we need to check in state()
// if state1 is aligned or not, and dynamically "swap" the field order if
// needed.
state1 uint64
state2 uint32
}
counter
是WaitGroup
里面需要等待的Goroutine
的数量
waiter
和sema
是信号量。
在分析源码之后我们得到以下结论:
-
sync.WaitGroup
必须在Wait
方法返回之后才能重新使用 -
Done
方法只是对Add
的简单封装。// Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) }
我们可以向
Add
方法传入任意负数,快速将计数器归零以唤醒等待的Goroutine
。但是需要注意的是:计数器非负,如果计数器是负数,那么程序就会直接崩溃 -
可以有多个
Goroutine
等待当前计数器归零,这些Goroutine
会被同时唤醒
Once
sync.Once
可以保证Go语言程序运行期间某段代码只会执行一次。
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("I love zyq!!")
})
}
}
// I love zyq!!
我们来看一下它的结构体:=
type Once struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/386),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m Mutex
}
会通过done
确保函数不会执行第二次
再来看看Do
方法:
func (o *Once) Do(f func()) {
// Note: Here is an incorrect implementation of Do:
//
// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
// f()
// }
//
// Do guarantees that when it returns, f has finished.
// This implementation would not implement that guarantee:
// given two simultaneous calls, the winner of the cas would
// call f, and the second would return immediately, without
// waiting for the first's call to f to complete.
// This is why the slow path falls back to a mutex, and why
// the atomic.StoreUint32 must be delayed until after f returns.
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
通过done
这个成员变量,我们可以达到以下的执行效果
- 如果传入的函数已经执行过,会直接返回
- 如果没有执行过,也就是
o.done
还是0,那么就会调用o.doSlow(f)
执行传入的函数。
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
这个函数的逻辑如下:
- 为当前
Goroutine
获取互斥锁 - 执行传入的无入参函数
- 运行延迟调用,将成员变量
done
的值更新为1
由源码我们可以知道一个使用的注意事项:
这个方法传入不同的函数只会执行第一次哦调用传入的函数
Cond
我们先来看一下它的用法:
var status int64
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}
该程序一共运行了11个Goroutine
- 10个通过
sync.Cond.Wait
等待特定条件满足 - 1个会调用
sync.Cond.Broadcast
唤醒所有陷入等待的Gourtine
上述代码会打印出10个listen
。
上面出现了这个代码,所以我们也顺便介绍一下:
func Notify(c chan<- os.Signal, sig ...os.Signal)
Notify函数让signal包将输入信号转发到c。如果没有列出要传递的信号,会将所有输入信号传递到c;否则只传递列出的输入信号。
结构体
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
noCopy
保证结构体不会在编译期间复制L
用户保护内部的notify
字段,Locker
接口类型的变量notify
是一个Goroutine
的链表,它是实现同步机制的核心结构copyChecker
用于禁止运行期间发生的复制
type notifyList struct {
wait uint32
notify uint32
lock mutex
head *sudog
tail *sudog
}
在 sync.notifyList
结构体中,head
和 tail
分别指向的链表的头和尾,wait
和 notify
分别表示当前正在等待的和已经通知到的 Goroutine 的索引。