官方文档:https://pkg.go.dev/sync
临界区
临界区(critical section)是指包含有共享数据的一段代码,这些代码可能被多个线程访问
或修改。临界区的存在就是为了保证当有一个线程在临界区内执行的时候,不能有其他任何线程被允许在临界区执行。
每个临界区都有相应的进入区(entry section)和退出区(exit section),可以按图 2.3方式表示。
设想有 A,B 两个线程执行同一段代码,则在任意时刻至多只能有一个线程在执行临界区内的代码。即,如果 A 线程正在临界区执行,B 线程则只能在进入区等待。只有当 A 线程执行完临界区的代码并退出临界区,原先处于等待状态的 B 线程才能继续向下执行并进入临界区。
如果只有一个主线程,输入结果正常
package main
import "fmt"
var x = 0
func main() {
for i := 0; i < 10000; i++ {
x = x + 1
}
for i := 0; i < 10000; i++ {
x = x + 1
}
fmt.Printf("x value is %d\n", x)
}
输出结果:x value is 20000,符合预期
改为多协程访问,代码如下
package main
import (
"fmt"
"sync"
)
var x = 0
var wg sync.WaitGroup
func add() {
for i := 0; i < 10000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Printf("x value is %d\n", x)
}
输入结果:x value is 12718(每次执行不一样), 不符合预期
原因分析:
x = x + 1
- 首先获得x的值
- 计算 x + 1
- 把步骤2的计算结果赋值给 x
假设a、b 2个协程拿到的x值都是99,当a协程进行步骤2时,b协程进行到步骤3,b协程把步骤2的值100给了x,a又重新把步骤2的值100给x,把b协程的值覆盖掉。导致x的值99经过2次加1是100而不是期望的101。
互斥锁
互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。
sync.Mutex定义了两个方法:Lock和 Unlock。所有在 Lock 和 Unlock 之间的代码,都只能由一个 Go 协程执行,于是就可以避免竞态条件。
改进后的程序如下
package main
import (
"fmt"
"sync"
)
var x = 0
var mutex sync.Mutex
var wg sync.WaitGroup
func add() {
for i := 0; i < 10000; i++ {
mutex.Lock()
x = x + 1
mutex.Unlock()
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Printf("x value is %d\n", x)
}
每次输出结果:x value is 20000,符合预期。
读写锁
读操作可并发重入,写操作是互斥的。这意味着多个线程可以同时读数据,但写数据时需要获得一个独占的锁。当写者写数据时,其它写者或读者需要等待,直到这个写者完成写操作。读写锁在Go语言中使用sync包中的RWMutex类型。
读写锁优先级策略
读写锁可以有不同的操作模式优先级:
- 读操作优先锁:提供了最大并发性,但在锁竞争比较激烈的情况下,可能会导致写操作饥饿。这是由于只要还有一个读线程持锁,写线程就拿不到锁。多个读者可以立刻拿到锁,这意味着一个写者可能一直在等锁,期间新的读者一直可以拿到锁。极端情况下,写者线程可能会一直等锁,直到所有一开始就拿到锁的读者释放锁。读者的可以是弱优先级的,如前文所述,也可以是强优先级的,即只要写者释放锁,任何等待的读者总能先拿到。
- 写操作优先锁:如果队列中有写者在等锁,则阻止任何新读者拿锁,来避免了写操作饥饿的问题。一旦所有已经开始的读操作完成,等待的写操作立即获得锁。和读操作优先锁相比,写操作优先锁的不足在于在写者存在的情况下并发度低。内部实现需要两把互斥锁。
- 未指定优先级锁:不提供任何读/写的优先级保证。
Go 读写锁(sync.RWMutex)不会导致写操作饥饿。读写锁允许多个 goroutine 同时读取共享资源,但只有一个 goroutine 可以进行写操作。当有写操作等待时,读操作会被阻塞,直到写操作完成。这样可以保证写操作不会被无限期地延迟,从而避免了写操作饥饿的问题。同时,读写锁还支持优先级反转,即当有写操作等待时,新的读操作也会被阻塞,以确保写操作尽快得到执行。
读写锁优势是在读多写少的情况,举例如下:
package main
import (
"fmt"
"sync"
"time"
)
var x = 10
var wg sync.WaitGroup
var mutex sync.Mutex
func write() {
mutex.Lock()
time.Sleep(1 * time.Millisecond) // 模拟写耗时1毫秒
x = x + 1
mutex.Unlock()
wg.Done()
}
func read() {
mutex.Lock()
time.Sleep(time.Millisecond) // 模拟读耗时1毫秒
mutex.Unlock()
wg.Done()
}
func main() {
// 统计开始时间
time1 := time.Now()
// 开10个协程写
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
// 开1000个协程读
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
fmt.Println("x最终值为:", x)
// 统计结束时间
time2 := time.Now()
fmt.Printf("总共耗时:%v\n", time2.Sub(time1)) // 结束时间-开始时间
}
输入结果如下:
x最终值为: 20
总共耗时:15.7080693s
package main
import (
"fmt"
"sync"
"time"
)
var x = 10
var wg sync.WaitGroup
var rwMutex sync.RWMutex
func write() {
rwMutex.Lock() // 写锁都用Lock
time.Sleep(1 * time.Millisecond) // 模拟写耗时1毫秒
x = x + 1
rwMutex.Unlock()
wg.Done()
}
func read() {
rwMutex.RLock() // 读锁用RLock
time.Sleep(time.Millisecond) // 模拟读耗时1毫秒
rwMutex.RUnlock()
wg.Done()
}
func main() {
// 统计开始时间
time1 := time.Now()
// 开10个协程写
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
// 开1000个协程读
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
fmt.Println("x最终值为:", x)
// 统计结束时间
time2 := time.Now()
fmt.Printf("总共耗时:%v\n", time2.Sub(time1)) // 结束时间-开始时间
}
输出结果如下:
x最终值为: 20
总共耗时:155.8851ms
sync.WaitGroup
请参考:https://blog.csdn.net/weixin_37909391/article/details/130853859
sync.Once
sync.Once只有一个方法,签名如下
func (o *Once) Do(f func())
当且仅当 Do 是第一次为 Once 的实例调用时,Do 才调用函数 f。
如果 once.Do(f) 被多次调用,只有第一次调用会调用 f,即使 f 在每次调用中都有不同的值。每个要执行的函数都需要一个新的 Once 实例。
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
结果:Only once
sync.Map
方法 | 说明 |
---|---|
func (m *Map) CompareAndDelete(key, old any) (deleted bool) | 少用,略 |
func (m *Map) CompareAndSwap(key, old, new any) bool | 少用,略 |
func (m *Map) Delete(key any) | Delete 删除键的值。 |
func (m *Map) Load(key any) (value any, ok bool) | Load 返回存储在映射中的键值,如果不存在值则返回 nil。 ok 结果表明是否在map中找到了值。 |
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) | LoadAndDelete 删除键的值,返回以前的值(如果有)。加载的结果报告密钥是否存在。 |
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) | LoadOrStore 返回键的现有值(如果存在)。否则,它存储并返回给定的值。如果值已加载,则加载结果为 true,如果已存储,则为 false。 |
func (m *Map) Range(f func(key, value any) bool) | Range 依次为映射中存在的每个键和值调用 f。如果 f 返回 false,则 range 停止迭代。 |
func (m *Map) Store(key, value any) | Store 设置键的值。 |
func (m *Map) Swap(key, value any) (previous any, loaded bool) | 少用,略 |
需要注意的是,由于 sync.Map 内部使用了一些技巧来实现并发安全,因此它的一些方法可能会比普通的 map 操作更慢。在性能要求较高的场景中,可以考虑使用其他的并发安全的数据结构,如 sync.Pool、atomic.Value 等。
package main
import (
"fmt"
"sync"
)
var m = make(map[int]int)
func main() {
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
m[n] = n * 10
fmt.Printf("key为:%d,value为:%d\n", n, m[n])
wg.Done()
}(i)
}
wg.Wait()
// panic:fatal error: concurrent map writes
}
package main
import (
"fmt"
"sync"
)
var m = sync.Map{}
func main() {
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
m.Store(n, n*10)
res, _ := m.Load(n)
fmt.Printf("key为:%d,value为:%d\n", n, res)
wg.Done()
}(i)
}
wg.Wait()
}
结果如下:
key为:0,value为:0
key为:19,value为:190
key为:5,value为:50
key为:6,value为:60
key为:7,value为:70
key为:4,value为:40
key为:8,value为:80
key为:9,value为:90
key为:10,value为:100
key为:11,value为:110
key为:3,value为:30
key为:12,value为:120
key为:15,value为:150
key为:13,value为:130
key为:18,value为:180
key为:14,value为:140
key为:17,value为:170
key为:1,value为:10
key为:16,value为:160
key为:2,value为:20