《Go语言并发之道》
- 第一章: 并发概述
- 第二章:对你的代码建模:通信顺序进程
- 第三章:GO语言并发组件
由于不怎么熟悉GO,只做简单的摘录,敲敲示例代码
鸭子类型:当看到一只鸟走起来像鸭子、游泳起来像鸭子、叫起来也像鸭子,那么这只鸟就可以被称为鸭子。
面试扣分点:什么是鸭子类型
Go-FAQ 翻译 | Seeker
第一章: 并发概述
令人尴尬的并行问题:
Many may wonder the etymology of the term “embarrassingly”. In this case, embarrassingly has nothing to do with embarrassment; in fact, it means an overabundance—here referring to parallelization problems which are “embarrassingly easy”.
cpu并行算法和gpu并行_令人尴尬的并行算法介绍
Web-Scale IT 我之见!
竞争条件
当两个或多个操作必须按正确的顺序执行,而程序并未保证这个顺序,就会发生竞争条件。
// 循环执行示例程序,记录各个结果出现次数
func main() {
var cnt [2]int
for i := 0; i < 10000000; i++ {
var data int
go func() {
data++
}()
if data == 0 {
cnt[data]++
}
}
fmt.Printf("cnt:%v", cnt)
}
// 执行三次
go run compete.go
cnt:[9999977 0]
cnt:[9999992 0]
cnt:[9999980 0]
在大多数情况下,引入数据竞争的原因是因为开发人员用顺序性的思维来思考问题。他们假设,某一行代码逻辑会在另一行代码逻辑之前先运行。我发现,有时候想象在两个操作之间会间隔很长一段时间是很有帮助的。
你应该始终以逻辑正确性为目标。在代码中引入休眠可以方便调试程序,但这并不能称之为一个解决方案。
原子性
当某些东西被认为是原子的,或者具有原子性的时候,这意味着在它运行的环境中,它是不可分割的或不可中断的。
在考虑原子性时,经常第一件需要做的事就是定义上下文或范围,然后再考虑这些操作是否是原子性的。一切都应当遵循这个原则。
死锁
死锁程序是所有并发进程彼此等待的程序。在这种情况下,如果没有外界的干预,这个程序将永远无法恢复。
示例程序
type value struct {
mu sync.Mutex
val int
}
func main() {
var wg sync.WaitGroup
// 获取锁后睡眠两秒再次获取另一个锁
printSum := func(v1, v2 *value) {
defer wg.Done()
v1.mu.Lock()
defer v1.mu.Unlock()
time.Sleep(2 * time.Second)
v2.mu.Lock()
defer v2.mu.Unlock()
fmt.Printf("sum=%v\n", v1.val+v2.val)
}
var a, b value
wg.Add(2)
go printSum(&a, &b)
go printSum(&b, &a)
wg.Wait()
}
运行输出
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0000a6018)
/usr/lib/go-1.13/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc0000a6010)
/usr/lib/go-1.13/src/sync/waitgroup.go:130 +0x64
main.main()
/root/mit6.824/6.824/src/expr/deadlock.go:30 +0x122
goroutine 18 [semacquire]:
sync.runtime_SemacquireMutex(0xc0000a6034, 0x1300, 0x1)
/usr/lib/go-1.13/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0000a6030)
/usr/lib/go-1.13/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
/usr/lib/go-1.13/src/sync/mutex.go:81
main.main.func1(0xc0000a6020, 0xc0000a6030)
/root/mit6.824/6.824/src/expr/deadlock.go:22 +0x1f4
created by main.main
/root/mit6.824/6.824/src/expr/deadlock.go:28 +0xea
goroutine 19 [semacquire]:
sync.runtime_SemacquireMutex(0xc0000a6024, 0x1300, 0x1)
/usr/lib/go-1.13/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0000a6020)
/usr/lib/go-1.13/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
/usr/lib/go-1.13/src/sync/mutex.go:81
main.main.func1(0xc0000a6030, 0xc0000a6020)
/root/mit6.824/6.824/src/expr/deadlock.go:22 +0x1f4
created by main.main
/root/mit6.824/6.824/src/expr/deadlock.go:29 +0x114
exit status 2
root@ubuntu ~/m/6/s/expr (master) [1]# go run deadlock.go
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0000a6018)
/usr/lib/go-1.13/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc0000a6010)
/usr/lib/go-1.13/src/sync/waitgroup.go:130 +0x64
main.main()
/root/mit6.824/6.824/src/expr/deadlock.go:30 +0x122
goroutine 18 [semacquire]:
sync.runtime_SemacquireMutex(0xc0000a6034, 0x1300, 0x1)
/usr/lib/go-1.13/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0000a6030)
/usr/lib/go-1.13/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
/usr/lib/go-1.13/src/sync/mutex.go:81
main.main.func1(0xc0000a6020, 0xc0000a6030)
/root/mit6.824/6.824/src/expr/deadlock.go:22 +0x1f4
created by main.main
/root/mit6.824/6.824/src/expr/deadlock.go:28 +0xea
goroutine 19 [semacquire]:
sync.runtime_SemacquireMutex(0xc0000a6024, 0x1300, 0x1)
/usr/lib/go-1.13/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0000a6020)
/usr/lib/go-1.13/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
/usr/lib/go-1.13/src/sync/mutex.go:81
main.main.func1(0xc0000a6030, 0xc0000a6020)
/root/mit6.824/6.824/src/expr/deadlock.go:22 +0x1f4
created by main.main
/root/mit6.824/6.824/src/expr/deadlock.go:29 +0x114
exit status 2
一个逻辑上“完美”的死锁将需要正确地同步。
Coffman死锁条件如下:
相互排斥
并发进程同时拥有资源的独占性
等待条件
并发进程必须同时拥有一个资源并等待额外的资源。
没有抢占
并发进程拥有的资源只能被该进程释放即可满足这个条件
循环等待
一个并发进程(P1)必须等待其余并发进程(P2),这些并发进程同时也在等待进程(P1)
活锁
活锁是正在主动执行并发操作的程序,但是这些操作无法向前推进程序的状态。
func main() {
cadence := sync.NewCond(&sync.Mutex{})
go func() {
for range time.Tick(1 * time.Millisecond) { // 定时发布广播
cadence.Broadcast()
}
}()
takeStep := func() {
cadence.L.Lock()
cadence.Wait() // 等待唤醒
cadence.L.Unlock()
}
tryDir := func(dirName string, dir *int32, out *bytes.Buffer) bool {
fmt.Fprintf(out, " %v", dirName)
atomic.AddInt32(dir, 1)
takeStep()
if atomic.LoadInt32(dir) == 1 { // 只有一个进程选择该方向
fmt.Fprintf(out, ".success!")
}
takeStep()
atomic.AddInt32(dir, -1)
return false
}
var left, right int32
tryLeft := func(out *bytes.Buffer) bool {
return tryDir("left", &left, out)
}
tryRight := func(out *bytes.Buffer) bool {
return tryDir("right", &right, out)
}
walk := func(walking *sync.WaitGroup, name string) {
var out bytes.Buffer
defer walking.Done()
defer func() { // 需放在Done后面保证一定输出
fmt.Println(out.String())
}()
fmt.Fprintf(&out, "%v is try to scoot:", name)
for i := 0; i < 5; i++ {
if tryLeft(&out) || tryRight(&out) {
return
}
}
fmt.Fprintf(&out, "\n%v tosses her hands up in exasperation!", name)
}
var peopleInHallway sync.WaitGroup
peopleInHallway.Add(2)
go walk(&peopleInHallway, "Alice")
go walk(&peopleInHallway, "Bob")
peopleInHallway.Wait()
}
/*
Bob is try to scoot: left right left right left right left right left right
Bob tosses her hands up in exasperation!
Alice is try to scoot: left right left right left right left right left right
Alice tosses her hands up in exasperation!
*/
饥饿
饥饿是在任何情况下,并发进程都无法获得执行工作所需的所有资源。
func main() {
var wg sync.WaitGroup
var sharedLock sync.Mutex
const runtime = 1 * time.Second
greedWorker := func() {
defer wg.Done()
var count int
for begin := time.Now(); time.Since(begin) < runtime; {
sharedLock.Lock()
time.Sleep(3 * time.Nanosecond)
sharedLock.Unlock()
count++
}
fmt.Printf("Greedy worker was able to execute %v work loops\n", count)
}
politeWorker := func() {
defer wg.Done()
var count int
for begin := time.Now(); time.Since(begin) < runtime; {
sharedLock.Lock()
time.Sleep(1 * time.Nanosecond)
sharedLock.Unlock()
sharedLock.Lock()
time.Sleep(1 * time.Nanosecond)
sharedLock.Unlock()
sharedLock.Lock()
time.Sleep(1 * time.Nanosecond)
sharedLock.Unlock()
count++
}
fmt.Printf("Polite worker was able to execute %v work loops\n", count)
}
wg.Add(2)
go greedWorker()
go politeWorker()
wg.Wait()
}
/*
Greedy worker was able to execute 831906 work loops
Polite worker was able to execute 564963 work loops
*/
第二章:对你的代码建模:通信顺序进程
并发属于代码,并行属于一个运行中的程序
- 首先,我们并没有编写并行的代码,只有我们希望可以并行执行的并发代码。另外,并行是我们程序运行时的属性,而不是我们的代码。
- 其次,就是可能对我们所写的并发代码是否真的并行执行,保持不知情。这只有在我们的程序模型之下的抽象层实现:并发原语,程序的运行时,操作系统,操作系统所运行的平台(运行在hypervisor,容器和虚拟机时),以及最终的CPU,这些抽象给予我们区分并发与并行的能力,最终给了我们灵活而有力的表达。让我们回到这个问题本身。
- 第三个也是最后一个有意思的事情是并行是一个时间或者上下文的函数。
通常来说,一种语言会将它们的抽象链结束在系统线程和内存访问同步的层级。GO语言采用了一种不同的路线,并使用goroutine和channel来代替这些概念
不要通过共享内存进行通信,而是通过通信来共享内存
GO语言的并行性哲学可以这样总结:追求简洁,尽量使用channel,并且认为goroutine的使用是没有成本的。
第三章:GO语言并发组件
GO语言中的goroutine是独一无二的(尽管其他的一些语言有类似的并发原语)。它们不是OS线程,也不是绿色线程(由语言运行时管理的线程),它们是一个更高级别的抽象,称为协程。协程是一种非抢占式的简单并发子goroutine(函数,闭包或方法),也就是说,它们不能被中断。取而代之的是,协程有多个point,允许暂停或重新进入。
GO语言的主机托管机制是一个名为M:N调度器的实现,这意外这它将M个绿色线程映射到N个OS线程,然后将goroutine运行在绿色线程上。当我们的goroutine数量超过可用的绿色线程时,调度程序将处理分布在可用线程上的goroutine,并确保当这些goroutine阻塞时,其他的goroutine可以运行。
GO语言遵循一个称为fork-join的并发模型。fork这个词指的是在程序中的任意一个节点,可以将子节点与父节点同时运行。join这个词指的是,在将来某个时候,这些并发的执行分支将会合并在一起。joint point是保证程序正确性和消除竞争条件的关键。
// 证明goroutine在它们所创建的相同地址空间内执行
func main() {
var wg sync.WaitGroup
str := "hello"
wg.Add(1)
go func() {
defer wg.Done()
str = "world"
}()
wg.Wait()
fmt.Println(str)
}
// world
空goroutine大小
func main() {
memConsumed := func() uint64 {
runtime.GC()
var s runtime.MemStats
runtime.ReadMemStats(&s)
return s.Sys
}
var c <-chan interface{}
var wg sync.WaitGroup
noop := func() {
wg.Done()
<-c
}
const numGoroutines int = 1e5
wg.Add(numGoroutines)
before := memConsumed()
for i := 0; i < numGoroutines; i++ {
go noop()
}
wg.Wait()
after := memConsumed()
fmt.Printf("before:%vkb after:%vkb consume:%.3fkb", before/1000, after/1000, float64(after-before)/float64(numGoroutines)/1000)
}
// before:69994kb after:281516kb consume:2.115kb
上下文切换时间
func BenchmarkContextSwitch(b *testing.B) {
var wg sync.WaitGroup
begin := make(chan struct{})
c := make(chan struct{})
var token struct{}
sender := func() {
defer wg.Done()
<-begin
for i := 0; i < b.N; i++ {
c <- token
}
}
receiver := func() {
defer wg.Done()
<-begin
for i := 0; i < b.N; i++ {
<-c
}
}
wg.Add(2)
go sender()
go receiver()
b.StartTimer()
close(begin)
wg.Wait()
}
go test -bench=. -cpu=1 bench_test.go
goos: linux
goarch: amd64
BenchmarkContextSwitch 8860370 157 ns/op
PASS
ok command-line-arguments 1.534s
sync包
你可以将WaitGroup视为一个并发-安全的计数器:调用通过传入的整数执行Add方法增加计数器的增量,并调用Done方法对计数器进行递减,Wait方法阻塞,直到计数器为零。注意,Add调用是在它们帮助跟踪的goroutine之外完成的。
读写锁
func main() {
producer := func(wg *sync.WaitGroup, l sync.Locker) {
defer wg.Done()
for i := 5; i >= 0; i-- {
l.Lock()
l.Unlock()
time.Sleep(1)
}
}
observer := func(wg *sync.WaitGroup, l sync.Locker) {
defer wg.Done()
l.Lock()
l.Unlock()
}
test := func(count int, mutex, rwMutex sync.Locker) time.Duration {
var wg sync.WaitGroup
wg.Add(count + 1)
beginTestTime := time.Now()
go producer(&wg, mutex)
for i := count; i > 0; i-- {
go observer(&wg, rwMutex)
}
wg.Wait()
return time.Since(beginTestTime)
}
tw := tabwriter.NewWriter(os.Stdout, 0, 1, 2, ' ', 0)
defer tw.Flush()
var m sync.RWMutex
fmt.Fprintf(tw, "Reader\tRWMutex\tMutex\n")
for i := 0; i < 20; i++ {
count := int(math.Pow(2, float64(i)))
fmt.Fprintf(tw, "%d\t%v\t%v\n", count, test(count, &m, m.RLocker()), test(count, &m, &m))
}
}
/*
Reader RWMutex Mutex
1 11.421µs 2.805µs
2 4.819µs 2.685µs
4 3.556µs 3.125µs
8 13.856µs 3.867µs
16 10.039µs 5.53µs
32 37.981µs 9.057µs
64 60.037µs 114.142µs
128 143.297µs 38.381µs
256 161.08µs 58.189µs
512 343.771µs 141.238µs
1024 474.765µs 727.275µs
2048 1.106501ms 987.48µs
4096 1.100992ms 1.41115ms
8192 2.010095ms 2.5819ms
16384 3.592384ms 4.176407ms
32768 8.957634ms 7.668959ms
65536 19.622861ms 14.164301ms
131072 31.256883ms 36.022752ms
262144 64.181958ms 59.230185ms
524288 120.306972ms 113.102032ms
*/
看不懂互斥锁与读写锁的时间对比是啥用意
cond:一个goroutine的集合点,等待或发布一个event。
注意,调用Wait不只是阻塞,它挂起了当前的goroutine,允许其他的goroutine在OS线程上运行。当你调用Wait时,会发生一些其他事情:进入Wait后,在Cond变量的Locker上调用Unlock方法;在退出Wait时,在Cond变量的Locker上执行Lock方法。
Signal示例
func main() {
c := sync.NewCond(&sync.Mutex{})
queue := make([]interface{}, 0, 10)
removeFromQueue := func(delay time.Duration) {
time.Sleep(delay)
c.L.Lock()
queue = queue[:1]
fmt.Println("removed from queue")
c.L.Unlock()
c.Signal()
}
for i := 0; i < 10; i++ {
c.L.Lock()
for len(queue) == 2 {
c.Wait()
}
fmt.Println("add to queue")
queue = append(queue, struct{}{})
go removeFromQueue(1 * time.Second)
c.L.Unlock()
}
}
/*
add to queue
add to queue
removed from queue
add to queue
removed from queue
add to queue
removed from queue
add to queue
removed from queue
add to queue
removed from queue
add to queue
removed from queue
add to queue
removed from queue
add to queue
removed from queue
add to queue
*/
BroadCast示例
type Button struct {
Clicked *sync.Cond
}
func main() {
button := Button{Clicked: sync.NewCond(&sync.Mutex{})}
subscribe := func(c *sync.Cond, fn func()) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Done()
c.L.Lock()
defer c.L.Unlock()
c.Wait()
fn()
}()
wg.Wait()
}
var clickRegister sync.WaitGroup
clickRegister.Add(3)
subscribe(button.Clicked, func() {
fmt.Println("Maximizing")
clickRegister.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("Display")
clickRegister.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("Mouse")
clickRegister.Done()
})
button.Clicked.Broadcast()
clickRegister.Wait()
}
/*
Mouse
Maximizing
Display
*/
once
顾名思义,sync.Once是一种类型,它在内部使用一些sync原语,以确保即使在不同的goroutine上也只会调用一次Do方法传递进来的函数。
grep -ir sync.Once $(go env GOROOT)/src | wc -l
112
示例代码
func main() {
var count int
increment := func() {
count++
fmt.Println("call increment function")
}
var once sync.Once
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
once.Do(increment)
}()
}
wg.Wait()
fmt.Printf("count is %d.\n", count)
}
/*
call increment function
count is 1.
*/
另一个示例代码
func main() {
var count int
increment := func() {
count++
}
decrement := func() {
count--
}
var once sync.Once
once.Do(increment)
once.Do(decrement)
fmt.Printf("count is %d\n", count)
}
/*
count is 1
*/
sync.Once只计算调用Do方法的次数,而不是多少次唯一调用Do方法。
Pool
sync.Pool是Pool模式的并发安全实现,在较高的层次上,Pool模式是一种创建和提供可供使用的固定数量实例或Pool实例的方法。它通常用于创建昂贵的场景(数据库连接),以便只创建固定数量的实例,但不确定数量的操作仍然可用请求访问这些场景(什么鬼翻译)。对于Go语言的sync.Pool,这种数据类型可以被多个goroutine安全地使用
示例1
func main() {
var numCalcsCreated int
calcPool := &sync.Pool{New: func() interface{} {
numCalcsCreated += 1
mem := make([]byte, 1024)
return &mem
}}
// 用4KB初始化pool
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
const numWorkers = 1024 * 1024
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := numWorkers; i > 0; i-- {
go func() {
defer wg.Done()
mem := calcPool.Get().(*[]byte) // 断言
defer calcPool.Put(mem)
}()
}
wg.Wait()
fmt.Printf("%d calculators were created.\n", numCalcsCreated)
}
/*
8 calculators were created.
*/
示例代码2
func connectToService() interface{} {
time.Sleep(1 * time.Second)
return struct{}{}
}
func startNetworkDaemon() *sync.WaitGroup { // 开启后台服务协程,监听8080端口
var wg sync.WaitGroup
wg.Add(1)
go func() {
server, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Fatalf("cannot listen: %v", err)
}
defer server.Close()
wg.Done()
for {
conn, err := server.Accept()
if err != nil {
log.Printf("cannot accept connection:%v", err)
}
connectToService()
fmt.Fprintln(conn, "")
conn.Close()
}
}()
return &wg
}
func warmServiceConnCache() *sync.Pool { // 创建连接池
p := &sync.Pool{
New: connectToService,
}
for i := 0; i < 10; i++ { // 初始化连接池,放入10个连接
p.Put(p.New())
}
return p
}
func startNetworkDaemonWithPool() *sync.WaitGroup { // 开启后台服务协程,监听8080端口,使用连接池
var wg sync.WaitGroup
wg.Add(1)
go func() {
connPool := warmServiceConnCache()
server, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Fatalf("cannot listen: %v", err)
}
defer server.Close()
wg.Done()
for {
conn, err := server.Accept()
if err != nil {
log.Printf("cannot accept connection:%v", err)
}
svcConn := connPool.Get()
fmt.Fprintln(conn, "")
connPool.Put(svcConn)
conn.Close()
}
}()
return &wg
}
func init() {
// daemonStarted := startNetworkDaemon()
daemonStarted := startNetworkDaemonWithPool()
daemonStarted.Wait()
}
func BenchmarkNetworkRequest(b *testing.B) {
for i := 0; i < b.N; i++ {
conn, err := net.Dial("tcp", "localhost:8080") // 客户端程序
if err != nil {
b.Fatalf("cannot dial host:%v", err)
}
if _, err := ioutil.ReadAll(conn); err != nil { // 一直读取直到文件末尾
b.Fatalf("cannot read:%v", err)
}
conn.Close()
}
}
/*
goos: linux
goarch: amd64
BenchmarkNetworkRequest-8 10 1001305398 ns/op
PASS
ok command-line-arguments 11.023s
goos: linux
goarch: amd64
BenchmarkNetworkRequest-8 17644 1448938 ns/op
PASS
ok command-line-arguments 60.237s
*/
当你使用Pool工作时,记住以下几点:
- 当实例化sync.Pool,使用new方法创建一个成员变量,在调用时是线程安全的
- 当你收到一个来自Get的实例时,不要对所接收的对象的状态