目录
1.Go常见的并发模型
2.哪些方法安全读写共享变量
3.如何排查数据竞争问题
4.Go有哪些同步原语
1. Mutex (互斥锁)
2. RWMutex (读写互斥锁)
3. Atomic
3.1.使用场景
3.2.整型操作
3.3.指针操作
3.4.使用示例
4. Channel
使用场景
使用示例
5. sync.WaitGroup
使用场景
使用示例
内部结构
关键方法
源码解析
内部实现细节
6. sync.Once
使用场景
使用示例
实现原理
源码解析
详细解释
7. sync.Cond
使用场景
使用示例
实现原理
源码解析
Cond 结构体定义
Locker 接口
NewCond 函数
Wait 方法
Signal 方法
Broadcast 方法
8. sync.Pool
使用场景
使用场景
9. sync.Map
使用场景
使用示例
源码解析
10. context.Context
使用场景
使用示例
取消长时间运行的任务
设置请求的超时时间
传递请求范围的值
5.其他并发原语
1.Go常见的并发模型
2.哪些方法安全读写共享变量
3.如何排查数据竞争问题
4.Go有哪些同步原语
1. Mutex (互斥锁)
Mutex 是一种常用的锁机制,它可以用来保护临界区,确保同一时间只有一个 goroutine 访问共享资源。
package main
import (
"fmt"
"sync"
"time"
)
// 使用场景:
// 当多个 goroutines 需要访问和修改相同的变量或数据结构时,Mutex 可以用来确保每次只有一个 goroutine 在执行修改操作。
func main() {
var mu sync.Mutex
count := 0
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
mu.Lock()
count++
fmt.Printf("Count increased to: %d\n", count)
time.Sleep(time.Millisecond * 1) // 模拟耗时操作
mu.Unlock()
}()
}
wg.Wait()
fmt.Println("Final count:", count)
}
2. RWMutex (读写互斥锁)
RWMutex 允许多个读操作同时进行,但是一次只能有一个写操作。这可以提高程序的性能,特别是当读操作远远多于写操作时。
package main
import (
"fmt"
"sync"
)
// 使用场景:
// 当多个 goroutines 需要频繁读取共享数据,而写入操作较少时,RWMutex 可以提高并发性能。
func main() {
var mu sync.RWMutex
count := 0
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if i == 5 {
mu.Lock()
count++
fmt.Printf("Write operation: Count increased to: %d\n", count)
mu.Unlock()
} else {
mu.RLock()
fmt.Printf("Read operation: Current count is: %d\n", count)
mu.RUnlock()
}
}()
}
wg.Wait()
fmt.Println("Final count:", count)
}
3. Atomic
Atomic 提供了一组原子操作,用于在不使用锁的情况下更新某些类型的变量,这对于避免锁的竞争和提高并发性能非常有用。它是实现锁的基石。
3.1.使用场景
sync/atomic
包非常适合于那些需要高并发且操作简单的情况,例如计数器、标志位等。通过使用原子操作,可以显著减少锁的使用,从而提高程序的整体性能。
3.2.整型操作
对于整型变量,sync/atomic
提供了以下方法:
- LoadInt32: 原子性地加载一个 int32 值。
- StoreInt32: 原子性地存储一个 int32 值。
- SwapInt32: 原子性地交换一个 int32 值并返回旧值。
- AddInt32: 原子性地增加一个 int32 值。
- SubInt32: 原子性地减少一个 int32 值。
- CompareAndSwapInt32: 原子性地比较并交换一个 int32 值。
对于其他整型(int64, uint32, uint64, uintptr),也有类似的 Load, Store, Swap, Add, Sub, 和 CompareAndSwap 方法。
3.3.指针操作
对于指针,sync/atomic
提供了以下方法:
- LoadPointer: 原子性地加载一个 unsafe.Pointer 值。
- StorePointer: 原子性地存储一个 unsafe.Pointer 值。
- SwapPointer: 原子性地交换一个 unsafe.Pointer 值并返回旧值。
- CompareAndSwapPointer: 原子性地比较并交换一个 unsafe.Pointer 值。
3.4.使用示例
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 原子更新整型变量
func main() {
count := int64(0)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
atomic.AddInt64(&count, 1)
fmt.Printf("Count increased to: %d\n", atomic.LoadInt64(&count))
time.Sleep(time.Millisecond * 50) // 模拟耗时操作
}()
}
wg.Wait()
fmt.Println("Final count:", atomic.LoadInt64(&count))
}
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
)
type MyStruct struct {
Name string
Age int
}
func main() {
count := int64(0)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
atomic.AddInt64(&count, 1)
fmt.Printf("Count increased to: %d\n", atomic.LoadInt64(&count))
time.Sleep(time.Millisecond * 50) // 模拟耗时操作
}()
}
wg.Wait()
fmt.Println("Final count:", atomic.LoadInt64(&count))
// 使用指针
var ptr unsafe.Pointer
atomic.StorePointer(&ptr, unsafe.Pointer(new(MyStruct)))
var wgPtr sync.WaitGroup
wgPtr.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wgPtr.Done()
myStruct := (*MyStruct)(atomic.LoadPointer(&ptr))
myStruct.Age++
fmt.Printf("Age increased to: %d\n", myStruct.Age)
time.Sleep(time.Millisecond * 50) // 模拟耗时操作
}()
}
wgPtr.Wait()
myStruct := (*MyStruct)(atomic.LoadPointer(&ptr))
fmt.Println("Final age:", myStruct.Age)
}
4. Channel
Channel 是 Go 中实现通信和同步的重要手段之一。它允许 goroutines 相互通信和同步。
使用场景
消息队列,数据传递,信号通知,任务编排,锁
使用示例
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
go func() {
val := <-ch // 从通道接收数据
fmt.Println("Received value:", val)
}()
ch <- 1 // 发送数据到通道
time.Sleep(time.Second)
}
5. sync.WaitGroup
WaitGroup 用于等待一组 goroutines 完成它们的工作。
使用场景
当你需要确保所有并发运行的 goroutines 都完成任务后再继续执行主 goroutine 时。
使用示例
package main
import (
"fmt"
"sync"
"time"
)
// 使用场景:
// 当你需要确保所有并发运行的 goroutines 都完成任务后再继续执行主 goroutine 时。
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
fmt.Println("goroutine 1 done")
}()
go func() {
defer wg.Done()
time.Sleep(time.Second)
fmt.Println("goroutine 2 done")
}()
wg.Wait()
fmt.Println("All goroutines finished")
}
内部结构
sync.WaitGroup
的内部结构主要包含以下几个关键部分:
-
state0
- 一个uint32
类型的变量,用于存储等待组的状态。这个状态包含了两个重要的信息:- 任务数量(即待完成的任务数量)。
- 等待者数量(即正在等待所有任务完成的 goroutines 数量)。
-
noCopy
- 一个sync/noCopy
类型的字段,用于标记WaitGroup
不应被复制。
关键方法
sync.WaitGroup
提供了几个关键的方法:
-
Add(delta int)
- 增加或减少待完成的任务数量。如果delta
为正,则增加;如果为负,则减少。当delta
为负且减少了任务数量使得任务数量变为零时,会唤醒所有的等待者。 -
Done()
- 减少任务数量,通常用于表示一个任务已经完成。这相当于调用Add(-1)
。 -
Wait()
- 阻塞当前 goroutine,直到所有任务完成。如果当前没有任务,那么Wait()
方法会立即返回。
源码解析
// 结构体
type WaitGroup struct {
// 一个 sync/noCopy 类型的字段,用于标记 WaitGroup 不应被复制
noCopy noCopy
// state0 保存两个 32 位值的组合:
// 低 32 位保存未完成的任务数量,
// 高 32 位保存等待者的数量。
state0 uint32
}
// Add方法
// Add 方法负责更新任务数量,并在适当的时候唤醒等待者:
func (wg *WaitGroup) Add(delta int) {
// 从 state0 中获取当前的任务数量和等待者数量。
old := atomic.LoadUint32(&wg.state0)
for {
// 解析出任务数量。
n := int(old)
n += delta
// 如果任务数量小于 0,则返回错误。
if n < 0 {
panic(negCount)
}
// 新的状态,包括更新后的任务数量和等待者数量。
new := uint32(n) // 仅更新任务数量,等待者数量不变。
// 使用 CAS (compare-and-swap) 更新 state0。
if atomic.CompareAndSwapUint32(&wg.state0, old, new) {
break
}
old = atomic.LoadUint32(&wg.state0) // 重试
}
// 如果任务数量为 0,则唤醒所有等待者。
if n == 0 {
notifyAll(&wg.state0)
}
}
// Done 方法
// Done 方法实际上是对 Add(-1) 的封装:
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// Wait 方法
// Wait 方法阻塞当前 goroutine 直到所有任务完成:
func (wg *WaitGroup) Wait() {
// 增加等待者数量。
old := atomic.AddUint32(&wg.state0, waiters)
// 如果任务数量为 0,则立即返回。
if atomic.LoadUint32(&wg.state0)&pending == 0 {
return
}
// 等待直到任务完成。
wait(&wg.state0, old)
}
// 这里的 wait 函数是内部实现,它使用条件变量来等待,具体实现如下:
// wait blocks until the state is zero.
func wait(statep *uint32, old uint32) {
for {
// 如果任务数量为 0,则返回。
if atomic.LoadUint32(statep)&pending == 0 {
return
}
// 进入等待状态。
runtime_notifyWait(&statep, old)
old = atomic.LoadUint32(statep)
}
}
// runtime_notifyWait 和 notifyAll 是 Go 运行时提供的函数,用于实现条件变量的等待和通知功能。
内部实现细节
-
状态检查:
- 在
Add
方法中,通过原子操作检查当前任务数量是否为零。如果是零,则不需要做任何事情,直接返回。 - 如果不是零,则更新任务数量,并检查更新后的任务数量是否为零。如果是零,则唤醒所有等待者。
- 在
-
等待者处理:
- 在
Wait
方法中,当前 goroutine 成为等待者,并增加等待者数量。 - 如果此时任务数量为零,则立即返回。
- 如果任务数量不为零,则当前 goroutine 将进入阻塞状态,直到所有任务完成。
- 当任务完成时,等待者会被唤醒,并减少等待者数量。
- 在
-
原子操作:
- 使用
sync/atomic
包中的原子操作来更新状态,确保线程安全性。 - 通过
atomic.AddInt64
更新状态,通过atomic.LoadInt64
获取状态。
- 使用
-
条件变量:
- 使用
sync.runtime_notify
和sync.runtime_wait
来实现条件变量的功能,以等待或通知等待者。
- 使用
6. sync.Once
使用场景
Once 保证某个函数只被调用一次,即使有多个 goroutines 同时尝试调用该函数。
使用示例
package main
import (
"fmt"
"sync"
"time"
)
// 使用场景:
// 当你想要确保某个初始化操作只执行一次时。
func main() {
var once sync.Once
for i := 0; i < 10; i++ {
go func() {
once.Do(func() {
fmt.Println("This will be printed only once")
})
}()
}
time.Sleep(time.Second)
fmt.Println("Done")
}
实现原理
sync.Once
类型定义在一个 once
结构体中,该结构体包含以下字段:
- done - 一个
uint32
类型的原子变量,用来表示是否已经执行过操作。 - m - 一个互斥锁(Mutex),用于保护内部状态不被并发修改。
sync.Once
的主要方法有两个:Do
和 Done
。
- Do 方法接收一个函数作为参数,并保证这个函数仅被执行一次。
- Done 方法返回一个通道,当
Do
方法执行完毕后会关闭这个通道。
源码解析
type once struct {
// done 是一个原子变量,如果操作未执行则为 0,已执行则为 1。
done uint32
// m 是一个互斥锁,在执行动作时持有。
m Mutex
}
// Do 方法调用函数 f,如果这是第一次调用 Do 方法对于这个 Once 对象。
// 如果其他协程同时进入 Do,其中一个会执行 f,其他则会等待其完成。
func (o *once) Do(f func()) {
// 如果 done 已经为 1,则直接返回,不执行任何操作。
if atomic.LoadUint32(&o.done) == 1 {
return
}
// 否则尝试获取互斥锁。
o.m.Lock()
// 再次检查 done 是否为 1,防止其他 goroutine 已经完成了操作。
if atomic.LoadUint32(&o.done) != 1 {
// 如果不是,则执行函数 f 并将 done 设置为 1。
defer func() {
atomic.StoreUint32(&o.done, 1)
o.m.Unlock()
}()
f()
} else {
// 如果是,则释放锁并返回。
o.m.Unlock()
}
}
详细解释
- 原子读取: 使用 atomic.LoadUint32(&o.done) 快速检查 done 是否为 1。如果为 1,则说明已经执行过操作了,直接返回。
- 锁定: 如果 done 不为 1,则需要获取互斥锁来确保不会同时有多个 goroutine 执行相同的操作。
- 双重检查: 在获得锁之后再次检查 done,因为可能在等待锁的过程中另一个 goroutine 已经完成了操作。
- 执行函数: 如果 done 仍然为 0,则执行函数 f 并设置 done 为 1。
- 解锁: 完成操作后释放锁。
- 通过这种方式,sync.Once 能够确保函数 f 只会被执行一次,即使在高并发环境下也能保持这种行为不变。
7. sync.Cond
sync.Cond可以让一组的Coroutine都在满足特定条件时被唤醒
使用场景
利用等待/通知机制实现阻塞或者唤醒
使用示例
package main
import (
"fmt"
"sync"
"time"
)
func main() {
mu := &sync.Mutex{}
dataReady := false
data := "Hello, World!"
// 创建条件变量,传入互斥锁 mu
cond := sync.NewCond(mu)
// 生产者 goroutine
go func() {
time.Sleep(1 * time.Second)
mu.Lock()
fmt.Println("生产者:数据已准备好")
dataReady = true
//cond.Signal()
cond.Broadcast() // 数据准备好了,唤醒所有等待的消费者
mu.Unlock()
}()
// 消费者 goroutines
consumerCount := 3
for i := 0; i < consumerCount; i++ {
go func(id int) {
mu.Lock()
for !dataReady { // 如果数据没有准备好,则等待
fmt.Printf("消费者 %d:数据未准备好,正在等待...\n", id)
cond.Wait()
}
fmt.Printf("消费者 %d:数据已获取: %s\n", id, data)
mu.Unlock()
}(i)
}
time.Sleep(3 * time.Second) // 等待 goroutines 完成
fmt.Println("主goroutine结束")
}
实现原理
-
互斥锁 (
Mutex
或RWMutex
):sync.Cond
依赖于一个互斥锁(通常是一个Mutex
或RWMutex
),以确保在等待条件变量时,只有持有锁的 goroutine 才能调用Wait()
方法。 -
等待队列 (
waiterList
): 当一个 goroutine 调用Wait()
方法时,它会释放锁并被添加到等待队列中。当条件变量被Broadcast()
或Signal()
时,等待队列中的 goroutines 会被唤醒。 -
唤醒机制 (
Broadcast
和Signal
):Broadcast()
方法会唤醒等待队列中的所有 goroutines,而Signal()
方法只会唤醒等待队列中的一个 goroutine。
源码解析
在标准库 sync/cond.go
中
Cond 结构体定义
type Cond struct {
L Locker // 互斥锁接口
c chan struct{} // 用于信号的通道
}
L
是一个Locker
接口类型的指针,它可以是任何实现了Lock()
和Unlock()
方法的对象,如Mutex
或RWMutex
。c
是一个无缓冲的结构体通道,用于信号的传递。
Locker 接口
type Locker interface {
Lock()
Unlock()
}
这是一个简单的接口,它定义了锁的基本行为。
NewCond 函数
func NewCond(c Locker) *Cond {
return &Cond{c, make(chan struct{})}
}
New
函数接受一个 Locker
类型的参数并返回一个 Cond
实例。
Wait 方法
func (c *Cond) Wait() {
c.L.Lock()
c.L.Unlock()
c.c <- struct{}{}
}
实际上,Wait
方法的实现要比上述代码复杂得多。这里简化了实现以便更容易理解。在实际的 sync/cond.go
文件中,Wait
方法会释放锁、将当前 goroutine 加入等待队列,并阻塞当前 goroutine 直到接收到信号。
Signal 方法
func (c *Cond) Signal() {
select {
case c.c <- struct{}{}:
default:
}
}
Signal
方法尝试向 c
通道发送一个信号。如果通道未满,则发送成功;否则,由于通道无缓冲,Signal
方法将立即返回。
Broadcast 方法
func (c *Cond) Broadcast() {
for i := 0; i < len(c.c); i++ {
select {
case c.c <- struct{}{}:
default:
break
}
}
}
Broadcast
方法遍历 c.c
通道的长度,并尝试向通道发送信号。这会唤醒所有等待的 goroutines。
8. sync.Pool
使用场景
对象池化,TCP连接池、数据库连接池、Worker Pool
使用场景
package main
import (
"fmt"
"sync"
)
// 定义一个函数来演示使用 sync.Pool
func usePool() {
// 创建一个 sync.Pool
var pool sync.Pool
pool.New = func() interface{} {
return make([]int, 0, 100) // 初始容量为 100
}
// 从池中获取一个对象
slice := pool.Get().([]int)
// 使用 slice
for i := 0; i < 100; i++ {
slice = append(slice, i)
}
fmt.Println("Slice contents:", slice)
// 使用完毕后,将 slice 放回池中
pool.Put(slice)
}
func main() {
// 调用 usePool 函数
usePool()
// 再次使用相同的 pool
usePool()
}
9. sync.Map
是 Go 语言标准库中的一个线程安全的哈希表,它提供了并发安全的键值对存储功能。与传统的 map
不同,sync.Map
不需要显式的加锁来保证线程安全性,这使得它非常适合用于高并发环境下的键值对存储。
使用场景
-
并发读写:
- 当你需要一个可以被多个 goroutines 并发读写的键值对集合时,可以使用
sync.Map
。它可以在不需要手动加锁的情况下安全地读写数据。
- 当你需要一个可以被多个 goroutines 并发读写的键值对集合时,可以使用
-
缓存:
sync.Map
可以用来实现简单的缓存逻辑,特别是当缓存项的生命周期较短时。
-
配置管理:
- 在多线程环境中,
sync.Map
可以用来存储和更新配置信息
- 在多线程环境中,
使用示例
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// 创建一个 sync.Map 实例
syncMap := sync.Map{}
// 添加键值对
syncMap.Store("key1", "value1")
syncMap.Store("key2", "value2")
// 读取值
if value, ok := syncMap.Load("key1"); ok {
fmt.Println("Value of key1:", value)
} else {
fmt.Println("Key1 not found")
}
// 删除键值对
syncMap.Delete("key2")
// 遍历 sync.Map
syncMap.Range(func(key, value interface{}) bool {
fmt.Printf("Key: %v, Value: %v\n", key, value)
return true // 继续遍历
})
// 更新值
syncMap.Store("key1", "updated_value")
// 再次遍历 sync.Map
syncMap.Range(func(key, value interface{}) bool {
fmt.Printf("Key: %v, Value: %v\n", key, value)
return true // 继续遍历
})
// 使用 LoadOrStore
value, loaded := syncMap.LoadOrStore("key3", "default_value")
if loaded {
fmt.Println("Value already present:", value)
} else {
fmt.Println("Value added:", value)
}
// 使用 CompareAndSwap
oldValue := "updated_value"
newValue := "new_updated_value"
if swapped := syncMap.CompareAndSwap("key1", oldValue, newValue); swapped {
fmt.Println("Value updated:", newValue)
} else {
fmt.Println("Value not updated")
}
// 等待一段时间,让其他 goroutines 完成
time.Sleep(1 * time.Second)
}
源码解析
// entry 键值对中的值结构体
type entry struct {
p unsafe.Pointer // 指针,指向实际存储value值的地方
}
// Map 并发安全的map结构体
type Map struct {
mu sync.Mutex // 锁,保护read和dirty字段
read atomic.Value // 存仅读数据,原子操作,并发读安全,实际存储readOnly类型的数据
dirty map[interface{}]*entry // 存最新写入的数据
misses int // 计数器,每次在read字段中没找所需数据时,+1
// 当此值到达一定阈值时,将dirty字段赋值给read
}
// readOnly 存储map中仅读数据的结构体
type readOnly struct {
m map[interface{}]*entry // 其底层依然是个最简单的map
amended bool // 标志位,标识m.dirty中存储的数据是否和m.read中的不一样,flase 相同,true不相同
}
10. context.Context
Go语言中用于传递取消信号、截止时间、超时时间以及请求范围内的值的重要工具。
使用场景
-
取消长时间运行的任务:
- 当客户端或服务器想要取消一个长时间运行的任务时,可以发送一个取消信号到
context
中,从而让任务知道应该尽早停止。
- 当客户端或服务器想要取消一个长时间运行的任务时,可以发送一个取消信号到
-
设置超时时间:
- 可以通过
context
设置请求的最大持续时间,防止请求无限期地等待。
- 可以通过
-
传递请求范围的值:
- 可以在
context
中携带与请求相关的数据,例如认证信息、跟踪ID等。
- 可以在
-
资源管理:
- 在请求完成后释放资源,比如关闭数据库连接。
使用示例
取消长时间运行的任务
package main
import (
"context"
"fmt"
"time"
)
// LongRunningTask 模拟一个长时间运行的任务。
func LongRunningTask(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Task canceled.")
return
default:
fmt.Println("Working...")
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go LongRunningTask(ctx)
// 等待一段时间后取消任务
time.Sleep(5 * time.Second)
cancel()
// 主goroutine等待一段时间以确保子goroutine有时间退出
time.Sleep(1 * time.Second)
fmt.Println("Main goroutine finished.")
}
设置请求的超时时间
比如http请求和数据库连接超时
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
)
// ServerFunc 是一个简单的服务函数,它模拟一些耗时的操作。
func ServerFunc(w http.ResponseWriter, r *http.Request) {
// 从请求中获取上下文
ctx := r.Context()
// 设置超时时间为5秒
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// 模拟一些耗时的工作
for i := 0; ; i++ {
select {
case <-ctx.Done():
http.Error(w, "Request timed out", http.StatusRequestTimeout)
return
default:
fmt.Fprintf(w, "Working... (%d)\n", i)
time.Sleep(1 * time.Second)
}
}
}
func main() {
http.HandleFunc("/", ServerFunc)
log.Fatal(http.ListenAndServe(":8080", nil))
}
传递请求范围的值
package main
import (
"context"
"fmt"
"time"
)
// ProcessRequest 模拟处理一个带有请求范围值的请求。
func ProcessRequest(ctx context.Context) {
requestID, _ := ctx.Value("request_id").(string)
fmt.Printf("Processing request with ID: %s\n", requestID)
time.Sleep(1 * time.Second)
fmt.Println("Request processed.")
}
func main() {
ctx := context.WithValue(context.Background(), "request_id", "12345")
go ProcessRequest(ctx)
// 主goroutine等待一段时间以确保子goroutine完成
time.Sleep(2 * time.Second)
fmt.Println("Main goroutine finished.")
}
5.其他并发原语
Semaphore用于控制goroutine的数量