1.Channel情况总结
在进行Channel通道使用之前,先根据总结有缓冲型channel使用的情况,若对下表有疑问可以前往Golang Channel 实现原理与源码分析进行阅读,如下所示:
从上表中我们可以发现,若我们已经对channel初始化的情况下,有两种情况会导致channel产生panic:
- 对一个已关闭的通道,进行关闭
- 对一个已关闭的通道,写入数据
1.1 防止对已关闭的通道进行关闭或写入
法一: 当channel不会有值写入时可使用select 多路复用判断channel是否关闭
当我们确定ch中不会有值进行写入时,可以通过以下函数进行判断channel是否关闭
func IsClosed(ch <-chan struct{}) bool {
select {
//因为没有channel中向写入值,故读取到值一定是零值,且此时channel已关闭
case <-ch:
return true
default:
}
return false
}
法二:使用recover 预防可能的panic
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
// 返回值可以被修改
// 在一个延时函数的调用中。
justClosed = false
}
}()
//ch <- value //SafeSend()
// 假设这里 ch != nil 。
close(ch) // 如果 ch 已经被关闭将会引发 panic
return true // <=> justClosed = true; return
}
法三:使用sync.Once确保只关闭一次通道
// MyChannel代表了一个带有安全关闭方法的channel
type MyChannel struct {
C chan T // 内部封装的channel
once sync.Once // 可确保仅关闭一次的sync.Once对象
}
// NewMyChannel函数返回一个初始化过的MyChannel指针
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
// SafeClose方法可以安全地关闭MyChannel中的channel。
// 即使这个方法被多次调用,也只会执行一次关闭操作
func (mc *MyChannel) SafeClose() {
// 调用once.Do保证该函数只被执行一次
// 其中传递的函数close(mc.C)会安全地关闭channel mc.C
mc.once.Do(func() {
close(mc.C)
})
}
2. 通道关闭原则
上文虽然给出了一些解决方法,但都有各种问题和限制,并不适合某些情况。普遍原则是不发送数据给(或关闭)一个关闭状态通道。 如果所有的 goroutine都 能保证没有 goroutine 会再发送数据给(或关闭)一个关闭的通道, 这样的话 goruoutine 可以安全地关闭通道。然而,从读取方或者多个写入方之一实现这样的保证需要花费很大的工夫,而且通常会把代码写得很复杂。
因此我们并发处理时,需要避免出现这两种情况,据此提出了通道关闭原则,如下所示
- 不允许读取方关闭通道
- 不能关闭一个有多个并发写入者的通道。
即只能在写入方的 goroutine 中关闭只有该写入方的通道,即谁写入通道,谁负责关闭
3.使用示例:生产者消费者问题
在本节中我们采用生产者——消费者问题来进行使用的示例展示。
由于通道关闭原则要求谁写入通道,谁负责关闭,那么我们可以知道对于单生产者的情况,已经满足了通道关闭原则,在唯一的生产者中关闭通道即可,因此不需要进行示例,因此以下我们将对于多生产者问题进行详细介绍。
对于多生产者的情况下,要确保通道关闭原则,我们需要增加“管理角色”的通道,介绍如下:
- 业务通道:承载数据,用于多个协程间共享数据
- 信号通道:仅为了标记业务通道是否关闭而存在
信号通道需要满足两个条件:
- 具备广播功能:当业务通道需要关闭时,关闭该信号通道,通知所有消费协程业务通道关闭,因此信号通道必须是无缓冲通道(关闭后,所有等待read 该通道的阻塞协程,都会被唤醒)。
- 唯一关闭协程:
- 多个生产者,一个消费者:业务通道的这个生产者协程,就可以关闭信号通道
- 多个生产者,多个消费者:除了信号通道,还需要再单独开启一个媒介协程关闭信号通道,且该媒介协程具有一个媒介通道
3.1 多生产者单消费者——即多个写入,一个读取
对于多个生产者,一个消费者的场景,业务通道的唯一消费者协程,可以对信号通道stopCh进行关闭
package main
import (
"math/rand"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
// 业务通道,传递业务数据
dataCh := make(chan int,10)
// 信号通道:必须是无缓冲通道,,消费者关闭信号通道来广播给所有业务通道的发送者
stopCh := make(chan struct{})
// 开启1000个业务通道的生产者协程
for i := 0; i < 1000; i++ {
go producer(dataCh, stopCh)
}
// 开启消费者协程
go func() {
defer wg.Done()
consumer(dataCh, stopCh)
}()
wg.Wait()
}
//查看通道是否关闭
func IsClosed(ch <-chan struct{}) bool {
select {
//因为没有channel中向写入值,故读取到值一定是零值,且此时channel已关闭
case <-ch:
return true
default:
}
return false
}
// 生产者
func producer(dataCh chan<- int, stopCh <-chan struct{}) {
for {
//可以确保stopCh关闭后协程不会写入dataCh
if IsClosed(stopCh) {
return
}
//select当多个case可以执行时,而是随机执行,故需要上一段代码判断stopCh是否关闭
// 当stopCh读取到值时,说明消费者已经关闭了信号通道,此时不再向业务通道发送数据
select {
case <-stopCh:
return
case dataCh <- rand.Intn(100):
}
}
}
// 消费者
func consumer(dataCh <-chan int, stopCh chan<- struct{}) {
sum := 0
//直接for range循环遍历,消费者协程可以阻塞等待
//如果通道已关闭并且其中所有值都已经被读取,则循环将自动结束
for value := range dataCh {
sum += value
if sum > 1000 {
// 当达到某个条件时,通过关闭信号通道来广播给所有业务通道的发送者
println("写入过多数据,停止写入")
close(stopCh)
return
}
}
}
我们在业务通道的基础上,添加了信号通道,在某个条件满足时,通过单个消费者协程来关闭信号通道,广播给所有业务通道的生产者停止向业务通道发送数据的功能,从而让整个并发程序得以优雅地结束。该实现利用了 Go 语言中的无缓冲通道和关闭通道的机制,可以有效避免协程因在关闭后的通道上阻塞而无法退出的问题。
3.2 多生产者多消费者——即多个写入,多个读取
对于多个生产者,多个消费者的场景,不能让任意一个生产者或者消费者关闭数据通道,因此除了添加信号通道stopCh,还需要再单独开启唯一的媒介协程关闭信号通道,媒介协程要有自己的一个媒介通道:
- 唯一的媒介协程用来关闭信号通道,进行广播
- 其发送者是:业务通道的任一发送者和接收者,其接收者是:媒介协程(是唯一的)
- 媒介通道不需要关闭,故不需要遵循通道关闭原则
package main
import (
"log"
"math/rand"
"strconv"
"sync"
)
func main() {
const NumReceivers = 10
const NumSenders = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
dataCh := make(chan int, 10) //业务通道
stopCh := make(chan struct{}) //信号通道
mediatorCh := make(chan string) //媒介通道
var stoppedBy string
// 媒介协程
go mediator(mediatorCh, stopCh, &stoppedBy)
// 生产者协程
for i := 0; i < NumSenders; i++ {
go producer(strconv.Itoa(i), stopCh, dataCh, mediatorCh)
}
// 消费者协程
for i := 0; i < NumReceivers; i++ {
go func(i int) {
defer wgReceivers.Done()
consumer(strconv.Itoa(i), stopCh, dataCh, mediatorCh)
}(i)
}
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
// IsClosed 用于检查信号通道是否已经关闭。
func IsClosed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
}
return false
}
// 中介者
func mediator(mediatorCh <-chan string, stopCh chan<- struct{}, stoppedBy *string) {
*stoppedBy = <-mediatorCh
close(stopCh)
}
// 生产者
func producer(id string, stopCh <-chan struct{}, dataCh chan<- int, mediatorCh chan<- string) {
for {
value := rand.Intn(1000)
if value == 0 {
select {
case mediatorCh <- "sender#" + id:
default:
}
return
}
//判断stopCh是否关闭
if IsClosed(stopCh) {
return
}
//select默认行为非阻塞, 随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行
//防止协程阻塞
//防止向已关闭的通道发送数据导致panic
select {
case <-stopCh:
return
case dataCh <- value:
}
}
}
// 消费者
func consumer(id string, stopCh <-chan struct{}, dataCh <-chan int, mediatorCh chan<- string) {
for {
// 判断stopCh是否关闭
if IsClosed(stopCh) {
return
}
//select默认行为非阻塞, 随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行
//防止协程阻塞
//防止向已关闭的通道发送数据导致panic
select {
case <-stopCh:
return
case value := <-dataCh:
if value == 666 {
select {
case mediatorCh <- "receiver#" + id:
default:
}
return
}
}
}
}
3.3 总结
在多生产者单消费者问题中,我们可以看到我们并没有对业务通道进行关闭,我们只是在给业务通道中写入数据前判断信号通道是否关闭,如果信号通道已经关闭,则直接返回,而不向业务通道中写入数据,因此业务数据并没有关闭,那么就不会引起panic。
而在多生产者多消费者中,我们也是如此,并没有对业务通道进行关闭,我们只是在给业务通道中写入数据前判断信号通道是否关闭,但是我们此时我们需要考虑信号通道不能重复关闭,因此添加了媒介协程,通过媒介协程来对信号通道进行关闭,其他的与多生产者单消费者问题基本一致,由于业务通道没有关闭,一定不会引起panic。
而对于没有关闭的业务通道,当所有的协程正常退出时,Go 的垃圾回收会自动进行清理。
4. 实操进阶:利用channel实现高并发map
利用channel要求实现一个map:
- 面向高并发;
- 只存在插入和查询操作0(1);
- 查询时,若key 存在,直接返回val;若key不存在,阻塞直到key val对被放入后,获取val返回;
- 查询key不存在时,若等待指定时长仍未放入val,返回超时错误;
package main
import (
"fmt"
"sync"
"time"
)
type MyConcurrentMap struct {
type MyConcurrentMap struct {
mapData map[int]int //存放数据的map
keytoChan map[int]chan struct{} //存放key对应的channel,用于等待val放入
mu sync.Mutex //互斥锁,保证mapData和keytoChan的并发安全,
}
func newMyConcurrentMap() *MyConcurrentMap {
return &MyConcurrentMap{
mapData: make(map[int]int),
keytoChan: make(map[int]chan struct{}),
}
}
func (m *MyConcurrentMap) Put(k, v int) {
m.mu.Lock()
defer m.mu.Unlock()
m.mapData[k] = v
if ch, ok := m.keytoChan[k]; ok {
//如果channel没有关闭,关闭channel,关闭后唤醒所有等待的Get
if !isClosed(ch) {
close(ch)
}
return
}
}
func (m *MyConcurrentMap) Get(k int, maxWaitingDuration time.Duration) (int, error) {
m.mu.Lock()
if v, ok := m.mapData[k]; ok {
m.mu.Unlock()
return v, nil
}
//如果当前k的channel不存在,则创建channel
//因此,锁不能使用读写锁,因为在Get函数内会写入keytoChan[k]
ch, ok := m.keytoChan[k]
if !ok {
ch = make(chan struct{})
m.keytoChan[k] = ch
}
//提前解锁,防止等待读取ch时没有释放锁,造成死锁
m.mu.Unlock()
//超时返回错误
select {
case <-ch:
case <-time.After(maxWaitingDuration):
return 0, fmt.Errorf("time out")
}
//重新加锁
m.mu.Lock()
//当被唤醒后,mapdata中一定存在key,value对
v := m.mapData[k]
m.mu.Unlock()
return v, nil
}
// 判断channel是否关闭
func isClosed(ch chan struct{}) bool {
select {
case <-ch:
return true
default:
}
return false
}
// 对上述map结构体进行行高并发map测试
func main() {
//初始化map
myMap := newMyConcurrentMap()
//开启100个goroutine写入数据
for i := 0; i < 100; i++ {
go func(i int) {
myMap.Put(i, i)
}(i)
}
//开启150个goroutine读取数据,前100个读取已经存在的数据,后50个读取不存在的数据
for i := 0; i < 150; i++ {
go func(i int) {
v, err := myMap.Get(i, time.Second*2)
if err != nil {
fmt.Printf("get %d error: %v\n", i, err)
} else {
fmt.Printf("get %d success: %d\n", i, v)
}
}(i)
}
time.Sleep(time.Second * 3)
}
读取部分结果如下:
get 94 success: 94
get 96 success: 96
get 95 success: 95
get 99 success: 99
get 107 error: time out
get 143 error: time out
get 114 error: time out
get 133 error: time out
get 128 error: time out
以上代码利用 channel 和互斥锁实现了一个高并发的 map,其中使用了以下关键点:
- 使用互斥锁保证 mapData 和 keytoChan 的并发安全。
- 对于 Put 操作,直接将 key-value 放到 mapData 中。同时检查是否有等待该 key-value 的 Get读 操作,如果有检查对应的 channel是否已初始化并关闭,若未关闭则进行关闭,关闭后会唤醒所有正在等待该通道的Get读协程。这里的 channel 是通过 keytoChan 维护的,每个 key 都对应一个 channel。
- 对于 Get 操作,首先检查 mapData 中是否存在该 key,如果有则直接返回 value;否则,如果当前k的channel不存在,则创建一个新的 channel 并加入到 keytoChan 中,并解锁 mutex避免死锁。然后在 select 中等待该 channel 被关闭,即 key-value 被放入 mapData 中,并重新加锁以获取 value。
- 在等待时使用 time.After 控制超时时间,避免长时间无限等待。
- isClosed判断 channel 是否关闭,使用select非阻塞读取方式,如果能读到值则 channel 已经被