前言
在日常开发中, 经常会使用chan
来进行协程之间的通信. 对chan
的操作也无外乎读写关. 而本次, 就是从chan
的关闭而来.
假设我们对外提供的方法如下:
type Chan struct {
ch chan int
}
func (c *Chan) Close() {
close(c.ch)
}
func (c *Chan) Send(v int) {
c.ch <- v
}
那么, 简单写段逻辑测试一下:
func main() {
for {
c := &Chan{ch: make(chan int, 100)}
w := sync.WaitGroup{}
w.Add(2)
go func() {
defer w.Done()
c.Send(1)
}()
go func() {
defer w.Done()
c.Close()
}()
w.Wait()
}
}
很快, 你就会看到报错了:
报错的原因也很简单, 就是因为向已关闭的管道中写数据.
我们如何能实现安全的管道关闭操作呢?
注意, 本次不讨论重复调用Close
方法的情况, 这种只要加锁保证单次执行就可以了, 情况简单.
如何关闭管道
1.漏洞百出版
有的小伙伴想到了, 加个状态判断不就行了, 管道关闭之后就不再往里写咯. 并很快给出了代码:
type Chan struct {
ch chan int
isClosed bool
}
func (c *Chan) Close() {
c.isClosed = true
close(c.ch)
}
func (c *Chan) Send(v int) bool {
if c.isClosed {
return false
}
c.ch <- v
return true
}
但是, 我相信在运行之后就不会这么想了.
问题: 如果在发送时的 isClosed
判断与chan
通信之间发生了管道的关闭, 还是会出错.
2. 粗糙的正确加锁版
既然发生错误的原因是操作的原则性, 那很自然的就会想到加锁.
type Chan struct {
ch chan int
isClosed bool
lock sync.Mutex
}
func (c *Chan) Close() {
c.lock.Lock()
defer c.lock.Unlock()
c.isClosed = true
close(c.ch)
}
func (c *Chan) Send(v int) bool {
c.lock.Lock()
defer c.lock.Unlock()
if c.isClosed {
return false
}
c.ch <- v
return true
}
将isClosed
变量的访问与赋值通过加锁来实现原子操作, 那么我们直接让Close
和Send
函数全体加锁, 将其强行改为串行, 不就解决了嘛?
开心的告诉你, 没错, 这样确实解决了.而且, 加锁的粒度不能再小了,
但是, 不得不遗憾的告诉你, 这么写只能说看起来对.
问题: 因为向管道写数据是阻塞的, 当chan
满了再写数据, 就会阻塞在这里, 持有的锁就不会释放. 导致关闭函数一直无法执行.
有的小机灵鬼想到了, 使用select-case
将阻塞写改为非阻塞写不就行了嘛? 确实可以, 但这样无疑会增加调用者的成本.
3. 小机灵版本
有的小机灵想到了, 我再Send
的时候把panic
抓住处理掉不就行了么.
type Chan struct {
ch chan int
}
func (c *Chan) Close() {
close(c.ch)
}
func (c *Chan) Send(v int) (ret bool) {
defer func() {
if r := recover(); r != nil {
ret = false
}
}()
c.ch <- v
return true
}
开心的告诉你, 确实可以. 而且不存在锁的竞争问题, 很好.
但是, 在Go
的哲学中, 流程中的异常是通过error
返回, 通过捕捉panic
来实现总觉得怪怪的.
4. Sender
关闭
既然总会遇到风险, 那么我在Send
汉中中进行close
可以么?
type Chan struct {
ch chan int
needClose bool
once sync.Once
}
func (c *Chan) Close() {
c.needClose = true
}
func (c *Chan) Send(v int) (ret bool) {
if c.needClose {
c.once.Do(func() {
close(c.ch)
})
return false
}
c.ch <- v
return true
}
这样确实也是可以的, 但是别高兴的太早.
问题: 如果在Close
之后没有调用Send
方法, 那么此chan
就不会关闭, 也就无法通知到接收方了. 因此, 十分不推荐.
包括使用额外的chan
来实现, 也不推荐, 因为没有将chan
关闭, 无法通知接收方(如下):
type Chan struct {
ch chan int
done chan int
}
func (c *Chan) Close() {
close(c.done)
}
func (c *Chan) Send(v int) (ret bool) {
select {
case <-c.done:
return false
case c.ch <- v:
return true
}
}
等等吧, 其实有很多方式来实现, 在这里就不一一赘述了.
总结
回过头来再想.
- 为什么在官方的设计中, 管道不能重复关闭?
- 为什么向已关闭的管道发送数据会
panic
, 却可以从已关闭的管道读取数据?
从官方 API 的设计中, 我们可以猜到其希望调用方做的事情:
- 明确的知道自己什么时候关闭管道, 且仅有一个人来关闭
- 发送方明确的知道管道是否已经关闭, 所以在管道关闭后不会再写数据
- 接收方不知道管道的关闭情况, 因此需要通过返回值来判断
而我们关闭管道的目的是什么呢? 无非是:
- 通知接收方, 管道已经关闭, 处理完管道中的余量就可以退出了
- 通知发送方, 不要再发送数据量
那么, 我们就目的而言来分析:
- 通知接收方, 这个在官方的设计中就已经支持了, 只要保证管道不会重复
close
就行 - 通知发送方. 这里我们分两种情况来看:
- 后续没有数据需要发送. 此时貌似也不需要通知哈.
- 还有数据没有发送出去. 若还有数据的话, 不发出去是否会有问题? 是否应该等数据发完再关闭? 后续没发出去的数据如何处理?
综上, 我得出如下结论, 而这想必也是官方的意思吧:
- 管道的关闭应由发送方负责
- 发送方需在确认所有数据都发出之后再关闭管道
那么, 如果真的真的就是要在发送完之前关闭管道怎么办呢?
- 自查是否是设计问题, 导致管道的关闭者无法获得数据的发送信息
- 若流程实在无法更改, 推荐使用一个第三者(管道或锁)来将关闭状态通知给发送方和接收方, 而不关闭使用中的管道
- 因为管道本身就已经加锁了, 如果再通过加锁来实现的话无疑会造成额外的性能损耗. 甚至于我认为
recover
都要被加锁更好. - 当然了, 如果能通过修改设计来保证的话就更好了
- 因为管道本身就已经加锁了, 如果再通过加锁来实现的话无疑会造成额外的性能损耗. 甚至于我认为
以上, 如果你有什么其他意见, 烦请不吝赐教
原文地址: https://hujingnb.com/archives/888