Channel通道使用进阶:通道关闭原则、生产者消费者问题、高并发map

news2024/11/18 3:35:51

1.Channel情况总结

在进行Channel通道使用之前,先根据总结有缓冲型channel使用的情况,若对下表有疑问可以前往Golang Channel 实现原理与源码分析进行阅读,如下所示:
在这里插入图片描述
从上表中我们可以发现,若我们已经对channel初始化的情况下,有两种情况会导致channel产生panic:

  1. 对一个已关闭的通道,进行关闭
  2. 对一个已关闭的通道,写入数据

1.1 防止对已关闭的通道进行关闭或写入

法一: 当channel不会有值写入时可使用select 多路复用判断channel是否关闭

当我们确定ch中不会有值进行写入时,可以通过以下函数进行判断channel是否关闭

func IsClosed(ch <-chan struct{}) bool {
    select {
    //因为没有channel中向写入值,故读取到值一定是零值,且此时channel已关闭
    case <-ch:
        return true
    default:
    }
    return false
}

法二:使用recover 预防可能的panic

func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
        if recover() != nil {
            // 返回值可以被修改
            // 在一个延时函数的调用中。
            justClosed = false
        }
    }()
    //ch <- value //SafeSend()
    // 假设这里 ch != nil 。
    close(ch)   // 如果 ch 已经被关闭将会引发 panic
    
    return true // <=> justClosed = true; return
}

法三:使用sync.Once确保只关闭一次通道

// MyChannel代表了一个带有安全关闭方法的channel
type MyChannel struct {
    C    chan T     // 内部封装的channel
    once sync.Once // 可确保仅关闭一次的sync.Once对象
}

// NewMyChannel函数返回一个初始化过的MyChannel指针
func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

// SafeClose方法可以安全地关闭MyChannel中的channel。
// 即使这个方法被多次调用,也只会执行一次关闭操作
func (mc *MyChannel) SafeClose() {
    // 调用once.Do保证该函数只被执行一次
    // 其中传递的函数close(mc.C)会安全地关闭channel mc.C
    mc.once.Do(func() {
        close(mc.C)
    })
}

2. 通道关闭原则

上文虽然给出了一些解决方法,但都有各种问题和限制,并不适合某些情况。普遍原则是不发送数据给(或关闭)一个关闭状态通道。 如果所有的 goroutine都 能保证没有 goroutine 会再发送数据给(或关闭)一个关闭的通道, 这样的话 goruoutine 可以安全地关闭通道。然而,从读取方或者多个写入方之一实现这样的保证需要花费很大的工夫,而且通常会把代码写得很复杂。

因此我们并发处理时,需要避免出现这两种情况,据此提出了通道关闭原则,如下所示

  1. 不允许读取方关闭通道
  2. 不能关闭一个有多个并发写入者的通道。

即只能在写入方的 goroutine 中关闭只有该写入方的通道,即谁写入通道,谁负责关闭

3.使用示例:生产者消费者问题

在本节中我们采用生产者——消费者问题来进行使用的示例展示。

由于通道关闭原则要求谁写入通道,谁负责关闭,那么我们可以知道对于单生产者的情况,已经满足了通道关闭原则,在唯一的生产者中关闭通道即可,因此不需要进行示例,因此以下我们将对于多生产者问题进行详细介绍。

对于多生产者的情况下,要确保通道关闭原则,我们需要增加“管理角色”的通道,介绍如下:

  • 业务通道:承载数据,用于多个协程间共享数据
  • 信号通道:仅为了标记业务通道是否关闭而存在

信号通道需要满足两个条件:

  1. 具备广播功能:当业务通道需要关闭时,关闭该信号通道,通知所有消费协程业务通道关闭,因此信号通道必须是无缓冲通道(关闭后,所有等待read 该通道的阻塞协程,都会被唤醒)。
  2. 唯一关闭协程
    • 多个生产者,一个消费者:业务通道的这个生产者协程,就可以关闭信号通道
    • 多个生产者,多个消费者:除了信号通道,还需要再单独开启一个媒介协程关闭信号通道,且该媒介协程具有一个媒介通道

3.1 多生产者单消费者——即多个写入,一个读取

对于多个生产者,一个消费者的场景,业务通道的唯一消费者协程,可以对信号通道stopCh进行关闭

package main

import (
	"math/rand"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1)

	// 业务通道,传递业务数据
	dataCh := make(chan int,10)

	// 信号通道:必须是无缓冲通道,,消费者关闭信号通道来广播给所有业务通道的发送者
	stopCh := make(chan struct{})

	// 开启1000个业务通道的生产者协程
	for i := 0; i < 1000; i++ {
		go producer(dataCh, stopCh)

	}

	// 开启消费者协程
	go func() {
		defer wg.Done()
		consumer(dataCh, stopCh)
	}()

	wg.Wait()
}

//查看通道是否关闭
func IsClosed(ch <-chan struct{}) bool {
	select {
	//因为没有channel中向写入值,故读取到值一定是零值,且此时channel已关闭
	case <-ch:
		return true
	default:
	}
	return false
}

// 生产者
func producer(dataCh chan<- int, stopCh <-chan struct{}) {
	for {

		//可以确保stopCh关闭后协程不会写入dataCh
		if IsClosed(stopCh) {
			return
		}
		//select当多个case可以执行时,而是随机执行,故需要上一段代码判断stopCh是否关闭
		// 当stopCh读取到值时,说明消费者已经关闭了信号通道,此时不再向业务通道发送数据
		select {
		case <-stopCh:
			return
		case dataCh <- rand.Intn(100):
		}
	}
}

// 消费者
func consumer(dataCh <-chan int, stopCh chan<- struct{}) {
	sum := 0
	//直接for range循环遍历,消费者协程可以阻塞等待
	//如果通道已关闭并且其中所有值都已经被读取,则循环将自动结束
	for value := range dataCh {
		sum += value
		if sum > 1000 {
			// 当达到某个条件时,通过关闭信号通道来广播给所有业务通道的发送者
			println("写入过多数据,停止写入")
			close(stopCh)
			return
		}
	}
}

我们在业务通道的基础上,添加了信号通道,在某个条件满足时,通过单个消费者协程来关闭信号通道,广播给所有业务通道的生产者停止向业务通道发送数据的功能,从而让整个并发程序得以优雅地结束。该实现利用了 Go 语言中的无缓冲通道和关闭通道的机制,可以有效避免协程因在关闭后的通道上阻塞而无法退出的问题。

3.2 多生产者多消费者——即多个写入,多个读取

对于多个生产者,多个消费者的场景,不能让任意一个生产者或者消费者关闭数据通道,因此除了添加信号通道stopCh,还需要再单独开启唯一的媒介协程关闭信号通道,媒介协程要有自己的一个媒介通道

  • 唯一的媒介协程用来关闭信号通道,进行广播
  • 其发送者是:业务通道的任一发送者和接收者,其接收者是:媒介协程(是唯一的)
  • 媒介通道不需要关闭,故不需要遵循通道关闭原则
package main

import (
	"log"
	"math/rand"
	"strconv"
	"sync"
)

func main() {
	const NumReceivers = 10
	const NumSenders = 100

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	dataCh := make(chan int, 10)    //业务通道
	stopCh := make(chan struct{})   //信号通道
	mediatorCh := make(chan string) //媒介通道

	var stoppedBy string

	// 媒介协程
	go mediator(mediatorCh, stopCh, &stoppedBy)

	// 生产者协程
	for i := 0; i < NumSenders; i++ {
		go producer(strconv.Itoa(i), stopCh, dataCh, mediatorCh)
	}

	// 消费者协程
	for i := 0; i < NumReceivers; i++ {
		go func(i int) {
			defer wgReceivers.Done()
			consumer(strconv.Itoa(i), stopCh, dataCh, mediatorCh)
		}(i)
	}

	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}

// IsClosed 用于检查信号通道是否已经关闭。
func IsClosed(ch <-chan struct{}) bool {
	select {
	case <-ch:
		return true
	default:
	}
	return false
}

// 中介者
func mediator(mediatorCh <-chan string, stopCh chan<- struct{}, stoppedBy *string) {
	*stoppedBy = <-mediatorCh
	close(stopCh)
}

// 生产者
func producer(id string, stopCh <-chan struct{}, dataCh chan<- int, mediatorCh chan<- string) {
	for {
		value := rand.Intn(1000)
		if value == 0 {
			select {
			case mediatorCh <- "sender#" + id:
			default:
			}
			return
		}

		//判断stopCh是否关闭
		if IsClosed(stopCh) {
			return
		}

		//select默认行为非阻塞, 随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行
		//防止协程阻塞
		//防止向已关闭的通道发送数据导致panic
		select {
		case <-stopCh:
			return
		case dataCh <- value:
		}
	}
}

// 消费者
func consumer(id string, stopCh <-chan struct{}, dataCh <-chan int, mediatorCh chan<- string) {
	for {
		// 判断stopCh是否关闭
		if IsClosed(stopCh) {
			return
		}

		//select默认行为非阻塞, 随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行
		//防止协程阻塞
		//防止向已关闭的通道发送数据导致panic
		select {
		case <-stopCh:
			return
		case value := <-dataCh:
			if value == 666 {

				select {
				case mediatorCh <- "receiver#" + id:
				default:
				}
				return
			}
		}
	}
}

3.3 总结

在多生产者单消费者问题中,我们可以看到我们并没有对业务通道进行关闭,我们只是在给业务通道中写入数据前判断信号通道是否关闭,如果信号通道已经关闭,则直接返回,而不向业务通道中写入数据,因此业务数据并没有关闭,那么就不会引起panic。

而在多生产者多消费者中,我们也是如此,并没有对业务通道进行关闭,我们只是在给业务通道中写入数据前判断信号通道是否关闭,但是我们此时我们需要考虑信号通道不能重复关闭,因此添加了媒介协程,通过媒介协程来对信号通道进行关闭,其他的与多生产者单消费者问题基本一致,由于业务通道没有关闭,一定不会引起panic。

而对于没有关闭的业务通道,当所有的协程正常退出时,Go 的垃圾回收会自动进行清理。

4. 实操进阶:利用channel实现高并发map

利用channel要求实现一个map:

  1. 面向高并发;
  2. 只存在插入和查询操作0(1);
  3. 查询时,若key 存在,直接返回val;若key不存在,阻塞直到key val对被放入后,获取val返回;
  4. 查询key不存在时,若等待指定时长仍未放入val,返回超时错误;
package main

import (
	"fmt"
	"sync"
	"time"
)

type MyConcurrentMap struct {
type MyConcurrentMap struct {
	mapData   map[int]int			//存放数据的map
	keytoChan map[int]chan struct{}	//存放key对应的channel,用于等待val放入
	mu        sync.Mutex			//互斥锁,保证mapData和keytoChan的并发安全,
}


func newMyConcurrentMap() *MyConcurrentMap {
	return &MyConcurrentMap{
		mapData:   make(map[int]int),
		keytoChan: make(map[int]chan struct{}),
	}
}

func (m *MyConcurrentMap) Put(k, v int) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.mapData[k] = v
	if ch, ok := m.keytoChan[k]; ok {
		//如果channel没有关闭,关闭channel,关闭后唤醒所有等待的Get
		if !isClosed(ch) {
			close(ch)
		}
		return
	}

}

func (m *MyConcurrentMap) Get(k int, maxWaitingDuration time.Duration) (int, error) {
	m.mu.Lock()
	if v, ok := m.mapData[k]; ok {
		m.mu.Unlock()
		return v, nil
	}
	//如果当前k的channel不存在,则创建channel
	//因此,锁不能使用读写锁,因为在Get函数内会写入keytoChan[k]
	ch, ok := m.keytoChan[k]
	if !ok {
		ch = make(chan struct{})
		m.keytoChan[k] = ch
	}

	//提前解锁,防止等待读取ch时没有释放锁,造成死锁
	m.mu.Unlock()

	//超时返回错误
	select {
	case <-ch:

	case <-time.After(maxWaitingDuration):
		return 0, fmt.Errorf("time out")
	}
	//重新加锁
	m.mu.Lock()
	//当被唤醒后,mapdata中一定存在key,value对
	v := m.mapData[k]
	m.mu.Unlock()
	return v, nil
}

// 判断channel是否关闭
func isClosed(ch chan struct{}) bool {
	select {
	case <-ch:
		return true
	default:
	}
	return false
}
// 对上述map结构体进行行高并发map测试
func main() {
	//初始化map
	myMap := newMyConcurrentMap()
	//开启100个goroutine写入数据
	for i := 0; i < 100; i++ {
		go func(i int) {
			myMap.Put(i, i)
		}(i)
	}

	//开启150个goroutine读取数据,前100个读取已经存在的数据,后50个读取不存在的数据
	for i := 0; i < 150; i++ {
		go func(i int) {
			v, err := myMap.Get(i, time.Second*2)
			if err != nil {
				fmt.Printf("get %d error: %v\n", i, err)
			} else {
				fmt.Printf("get %d success: %d\n", i, v)
			}
		}(i)
	}
	time.Sleep(time.Second * 3)
}

读取部分结果如下:
get 94 success: 94
get 96 success: 96
get 95 success: 95
get 99 success: 99
get 107 error: time out
get 143 error: time out
get 114 error: time out
get 133 error: time out
get 128 error: time out

以上代码利用 channel 和互斥锁实现了一个高并发的 map,其中使用了以下关键点:

  1. 使用互斥锁保证 mapData 和 keytoChan 的并发安全。
  2. 对于 Put 操作,直接将 key-value 放到 mapData 中。同时检查是否有等待该 key-value 的 Get读 操作,如果有检查对应的 channel是否已初始化并关闭,若未关闭则进行关闭,关闭后会唤醒所有正在等待该通道的Get读协程。这里的 channel 是通过 keytoChan 维护的,每个 key 都对应一个 channel。
  3. 对于 Get 操作,首先检查 mapData 中是否存在该 key,如果有则直接返回 value;否则,如果当前k的channel不存在,则创建一个新的 channel 并加入到 keytoChan 中,并解锁 mutex避免死锁。然后在 select 中等待该 channel 被关闭,即 key-value 被放入 mapData 中,并重新加锁以获取 value。
  4. 在等待时使用 time.After 控制超时时间,避免长时间无限等待。
  5. isClosed判断 channel 是否关闭,使用select非阻塞读取方式,如果能读到值则 channel 已经被

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/641894.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

测试老鸟整理,Selenium自动化测试POM模式分层实战(详细)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 POM是Page Object…

3. SpringCloudAlibaba、nacos 实现配置中心

一、微服务中配置文件的问题 1.1 配置文件的问题&#xff1a; 配置文件的数量会随着服务的增加持续递增单个配置文件无法区分多个运行环境配置文件内容无法动态更新&#xff0c;需要重启服务 1.2 引入配置中心 引入配置中心&#xff1a;刚才架构就会成为这样。是由配置中心统…

ASP.NET Core Web API入门之二:Swagger详细使用路由设置

ASP.NET Core Web API入门之二&#xff1a;Swagger详细使用 一、引言二、Swagger的作用以及优点2.1 作用2.2 优点 三、API接口添加注释3.1 编辑项目文件3.2 修改 Startup.cs 文件的 ConfigureServices 方法3.3 修改浏览器的网页标题3.4 注册路由中间件3.4 接口添加注释 四、运行…

基于html+css的图展示125

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

项目管理专业人员能力评价(CSPM)好考吗?考试时间什么时候?

2021年10月&#xff0c;中共中央、国务院发布的《国家标准化发展纲要》明确提出构建多层次从业人员培养培训体系&#xff0c;开展专业人才培养培训和国家质量基础设施综合教育。建立健全人才的职业能力评价和激励机制。由中国标准化协会&#xff08;CAS&#xff09;组织开展的项…

和数集团Baas服务如何推动区块链技术应用和产业发展?

近日&#xff0c;《区块链和分布式记账技术 参考架构》(GB/T 42752-2023)国家标准正式发布。这是我国首个获批发布的区块链技术领域国家标准。该标准在区块链技术应用和产业发展方面提出了参考架构规范&#xff0c;包括用户视图、功能视图、实现视图和部署视图。在功能架构方面…

启程阿拉德之怒三端架设教程

阿拉德之怒是一款横版动作冒险RPG手游&#xff0c;游戏采用虚拟按键模式呈现指尖上的连击盛宴&#xff0c;波动血气等不同的奥义带来多系转职技能&#xff0c;讲述不同时空交错的节点诞生的大陆之上&#xff0c;来个各界的强者们汇聚在一起冒险战斗故事&#xff0c;领悟鬼手之力…

基于ubuntu22.04-深入浅出 eBPF

笔者在很早之前就看eBPF这类似的文章&#xff0c;那时候看这个技术一脸懵逼&#xff0c;不知道它是用来做什么&#xff0c;可以解决什么问题。所以也没有太关注这个技术。很庆幸最近刚好有机会研究这个技术。 什么是BPF BPF的全称是Berkaley Packet Filter,即伯克利报文过法器…

计算机视觉的应用7-利用YOLOv5模型启动电脑摄像头进行目标检测

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下计算机视觉的应用7-利用YOLOv5模型启动电脑摄像头进行目标检测&#xff0c;本文将详细介绍YOLOv5模型的原理&#xff0c;YOLOv5模型的结构&#xff0c;并展示如何利用电脑摄像头进行目标检测。文章将提供样例代码&a…

【GitLab】-HTTP Basic: Access denied.remote:You must use a personal access token

写在前面 本文简要说明GitLab配置accessToken以及双因子认证&#xff08;Two-factor authentication&#xff09;。 目录 写在前面一、场景描述二、具体步骤1.环境说明2.配置accessToken3.克隆项目4.双因子认证 三、参考资料写在后面 一、场景描述 在使用账号和密码的方式拉取公…

数据库的 Schema 变更实现

一、减少元数据变更的措施 元数据变更是数据库管理中不可避免的工作项&#xff0c;减少元数据变更次数可降低数据库维护和管理成本&#xff0c;减轻对业务的影响。这里我们可以优先考虑以下 3 点&#xff1a; 精细计划 在数据库设计和开发阶段&#xff0c;精细设计元数据结构…

月度精华汇总 | 最新XR行业资讯、场景案例、活动都在这一篇里啦!

​ 在过去的一个月中&#xff0c;平行云为您带来了关于XR领域的一系列精彩文章&#xff0c;涵盖了行业资讯、应用案例&#xff0c;市场互动&#xff0c;帮助您掌握XR领域最新动态&#xff0c;了解实时云渲染、Cloud XR技术的价值&#xff0c;以及平行云实时云渲染解决方案LarkX…

【每日算法】【160. 相交链表】

☀️博客主页&#xff1a;CSDN博客主页 &#x1f4a8;本文由 我是小狼君 原创&#xff0c;首发于 CSDN&#x1f4a2; &#x1f525;学习专栏推荐&#xff1a;面试汇总 ❗️游戏框架专栏推荐&#xff1a;游戏实用框架专栏 ⛅️点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd;&…

【陈老板赠书活动 - 04期】- 【C++、Linux、算法等系列众书】

陈老老老板&#x1f9b8; &#x1f468;‍&#x1f4bb;本文专栏&#xff1a;赠书活动专栏&#xff08;为大家争取的福利&#xff0c;免费送书&#xff09; &#x1f468;‍&#x1f4bb;本文简述&#xff1a;与几分醉意.一起搞的赠书活动一次30本书哦&#xff01;&#xff01;…

软件项目质量管理的4大注意事项

1、制定质量计划和评估标准 项目质量管理首先需要制定详细的质量计划&#xff0c;明确项目质量目标&#xff0c;制定质量评估标准和验收方案。质量计划需与项目计划密切相关&#xff0c;并确保项目质量管理与项目进度和成本控制相配合。 软件项目质量管理的4大注意事项 2、构建…

Java+Swing+mysql仿QQ聊天工具

JavaSwingmysql仿QQ聊天工具 一、系统介绍二、功能展示1.用户登陆2.好友列表3.好友聊天4.服务器日志 三、系统实现四、其它1.其他系统实现2.获取源码 一、系统介绍 系统主要功能&#xff1a;用户登陆、好友列表、好友聊天、服务器日志 二、功能展示 1.用户登陆 2.好友列表 3…

想要避免计划外停机?预测性维护技术是关键

在现代工业领域&#xff0c;非计划停机是一项令人头疼的问题&#xff0c;它导致生产损失、利润减少&#xff0c;并给运营团队带来巨大的压力。然而&#xff0c;基于时间的维护策略并不能有效应对所有设备故障&#xff0c;因为大部分故障表现出随机模式&#xff0c;难以准确预测…

深入探索基于Webdriver的分层自动化框架搭建

目录 前言&#xff1a; 1、基于webdriver的分层自动化框架及平台搭建&#xff0c;目前刚好在做这一块的工作&#xff0c;对于分层次和平台搭建&#xff0c;想问下大神有什么好的建议&#xff1f; 2、希望大神能自己的工作经历和经验&#xff0c;对初入测试行业的后辈有何建议…

SpringCloudAlibaba环境搭建版本说明

可以通过www.github.com网站搜索alibaba&#xff0c;点击第一个超链接 点击wiki 点击版本说明 里面有对应版本&#xff1a; 也可以通过版本说明 alibaba/spring-cloud-alibaba Wiki GitHub这个链接直接访问

模板学堂|DataEase地图视图功能详解

DataEase开源数据可视化分析平台于2022年6月正式发布模板市场&#xff08;https://dataease.io/templates/&#xff09;。模板市场旨在为DataEase用户提供专业、美观、拿来即用的仪表板模板&#xff0c;方便用户根据自身的业务需求和使用场景选择对应的仪表板模板&#xff0c;并…