16 go语言(golang) - 并发编程select和workerpool

news2024/11/29 6:41:55

select

在Go语言中,select语句用于处理多个channel的操作。它类似于switch语句,但专门用于channel通信。通过使用select,可以同时等待多个channel操作,并在其中一个操作准备好时执行相应的代码块。这对于需要处理并发任务和协调goroutine之间的通信非常有用。

基本用法

  • 多路复用:同时监听多个channel上的数据传输。
  • 非阻塞选择:如果没有任何case可以执行,可以使用default分支来实现非阻塞行为。
  • 超时控制:结合time.After函数,可以实现超时机制。
select {
case <-ch1:
    // 如果ch1成功读取到数据,则执行该case
case ch2 <- x:
    // 如果x成功发送到ch2,则执行该case
default:
    // 如果上面都没有成功,则进入default处理流程
}

具体示例

  • 使用select语句来等待任意一个channel的数据传入,并根据哪个通道先收到消息来决定执行哪个分支。
func Test1(t *testing.T) {
	ch1 := make(chan string)
	ch2 := make(chan string)
	ch3 := make(chan string)

	go func() {
		time.Sleep(100 * time.Millisecond)
		ch1 <- "线程1"
	}()

	go func() {
		time.Sleep(200 * time.Millisecond)
		ch2 <- "线程2"
	}()

	go func() {
		// 只管消费数据,阻塞着一直消费
		for range ch3 {
		}
	}()

	for i := 0; i < 5; i++ {
		select {
		// 这样写会有双重读取的问题
		// 因为尝试从 ch1 接收数据,然后立即再次接收数据并打印。这会导致问题,因为第二次接收是在没有检查是否有数据可用的情况下进行的,这可能会阻塞程序。
		//case <-ch1:
		//	value := <-ch1
		//	fmt.Printf("接受到来自 %s 的消息 \n", value)
		case value := <-ch1:
			fmt.Printf("接受到来自 %s 的消息 \n", value)
		case value := <-ch2:
			fmt.Printf("接受到来自 %s 的消息 \n", value)
		case ch3 <- "msg":
			time.Sleep(800 * time.Millisecond)
			fmt.Printf("尝试向ch3发送消息 \n")
		}
	}

}

输出

尝试向ch3发送消息 
接受到来自 线程2 的消息 
尝试向ch3发送消息 
尝试向ch3发送消息 
尝试向ch3发送消息 

注意:selectcase不是顺序执行的,在Go语言中,select语句的行为与switch语句不同。它并不是按顺序从上到下检查每个case,而是随机选择一个可以执行的case。这种设计是为了避免优先级问题,从而使得所有可用的channel操作都有平等的机会被选中。

default

func Test2(t *testing.T) {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(100 * time.Millisecond)
		ch1 <- "线程1"
	}()

	go func() {
		time.Sleep(200 * time.Millisecond)
		ch2 <- "线程2"
	}()

	for i := 0; i < 3; i++ {
		select {
		case value := <-ch1:
			fmt.Printf("接受到来自 %s 的消息 \n", value)
		case value := <-ch2:
			fmt.Printf("接受到来自 %s 的消息 \n", value)
		default:
			fmt.Println("没有数据")
			time.Sleep(1000 * time.Millisecond) // 没有等待的话会直接输出三次『没有数据』
		}
	}

}

工作原理

  1. 随机选择
    • 当有多个case都准备好时,Go会随机选择其中一个进行执行。这意味着如果有多个channel同时可以接收或发送数据,具体哪个case会被选中是不可预测的。
    • default分支比较特殊,只有在没有其他case可以执行时才会被选择。因此,它并不是与其他case一起随机选择的,而是作为一种“兜底”机制。
  2. 非阻塞检查
    • 如果没有任何channel操作可以立即进行,并且提供了default分支,那么default分支将被执行。
    • 如果没有default分支,则select将阻塞直到某个channel操作可以进行。
    • 如果所有channel操作都无法立即进行(即没有可用的数据接收或发送),且存在一个default分支,那么select将立即执行该默认分支,而不会阻塞。
  3. 公平性
    • 这种随机选择机制确保了所有准备好的通道都有机会被处理,而不会因为代码中的位置而导致某些通道总是优先于其他通道。

超时控制

  • 使用 time.After 函数可以实现对某个操作设置超时时间。如果在指定时间内没有接收到数据,可以执行超时逻辑。
func Test3(t *testing.T) {
	ch := make(chan string)

	go func() {
		var random = rand.Intn(2000)
		// 模拟随机等待0到2秒
		time.Sleep(time.Duration(random) * time.Millisecond)
		ch <- "msg"
	}()

	select {
	case msg := <-ch:
		fmt.Println("接受到消息:", msg)
	case <-time.After(1 * time.Second):
		fmt.Println("Timeout!")
	}
}

关闭通道检测

  • 当一个通道被关闭并且所有的数据都被读取完毕后,再次读取会立即返回零值,这可以通过 select 来检测。
func Test4(t *testing.T) {
	ch := make(chan string)
	go func() {
		ch <- "msg"
	}()

	select {
	case msg, ok := <-ch:
		if ok {
			fmt.Println("接受到消息:", msg)
		} else {
			fmt.Println("channel 已经关闭")
		}
	}

	close(ch)

	select {
	case msg, ok := <-ch:
		if ok {
			fmt.Println("接受到消息:", msg)
		} else {
			fmt.Println("channel 已经关闭")
		}
	}

}

workerpool

在Golang中,Worker Pool是一种并发编程模式,用于限制同时运行的goroutine数量,以控制资源使用和提高程序的性能。通过使用Worker Pool,可以有效地管理任务执行,避免因过多的goroutine导致系统资源耗尽。

基本原理

  1. 任务队列:一个用于存储待处理任务的通道。
  2. Workers:一组固定数量的goroutine,从任务队列中获取任务并执行。
  3. 结果收集:通常会有一个结果通道,用于收集每个worker完成后的结果。

工作流程

  • 主线程将所有待处理的任务放入到任务队列中。
  • 一组预先启动好的worker从该队列中获取任务进行处理。
  • 每个worker在完成其当前工作后,会继续从队列中获取下一个可用的工作,直到所有工作都被完成。

与线程池的区别

Worker Pool与线程池在概念上非常相似。两者都是用于管理并发任务执行的设计模式,旨在限制同时运行的工作单元(goroutines或线程)的数量,以有效利用系统资源并提高性能。

相似

  1. 资源管理:都用于限制并发执行单元(如goroutine或线程)的数量,从而控制对系统资源(如CPU、内存等)的使用。
  2. 复用工作单元:通过复用已有的工作单元来减少创建和销毁它们所带来的开销,提高效率。
  3. 异步执行:允许提交大量任务,并由池中的工作单元异步地完成这些任务。

不同

  1. 实现机制

    • 在Golang中,Worker Pool通常基于goroutines实现,而不是操作系统级别的线程。这使得Golang中的Worker Pool更加轻量级,因为goroutine比传统线程更小且启动速度更快。
    • 传统语言中的线程池通常直接基于操作系统提供的线程模型,这可能会导致较高的上下文切换开销和内存消耗。
  2. 调度方式

    • Golang有自己的调度器来管理goroutines,它可以自动将数千个甚至更多个goroutine映射到少量OS线程上运行。
    • 传统语言中的线程池依赖于操作系统调度器来管理和分配CPU时间片给各个线程。

实现

Golang标准库中没有内置的专门用于实现Worker Pool的包或功能。不过,Golang提供了强大的goroutine和channel机制,使得实现自定义的Worker Pool变得相对简单。

Worker Pool(工作池)的实现思路主要围绕如何有效管理和调度一组有限的工作者(goroutine)来执行任务。

1、定义 Worker Pool 结构

首先,定义一个 WorkerPool 结构,它包含以下元素:

  • 最大工作者数(maxWorkers:控制同时运行的 goroutine 的最大数量。
  • 任务队列(taskQueue:用于存储待处理任务,通常使用channel来实现。
  • 停止信号(stopSignal:一个通道,用于发送停止信号给所有工作者,让它们停止执行。
  • 同步机制:如互斥锁(sync.Mutex)或 WaitGroup(sync.WaitGroup),用于同步和等待所有工作者完成。
// WorkerPool
// 池
type WorkerPool struct {
	maxWorkerNums int
	taskQueue     chan Task
	stopSignal    chan int // 停止信号,接受到数据时时停止
	waitGroup     sync.WaitGroup

	// 保证停止后不能再提交任务
	isStop bool
	mu     sync.Mutex
}

2、初始化 Worker Pool

实现一个 New 函数来初始化 WorkerPool,设置初始状态,并启动一定数量的工作者。

  • 根据 maxWorkers 初始化工作者队列。
  • 创建 taskQueue
// NewWorkerPool
/* 初始化池
 */
func NewWorkerPool(maxWorkerNums int) WorkerPool {
	return WorkerPool{maxWorkerNums,
		make(chan Task),
		make(chan int, maxWorkerNums),
		sync.WaitGroup{},
		false,
		sync.Mutex{},
	}
}

3、提交任务

实现一个 Submit 方法,用于提交任务到 Worker Pool。

func (p *WorkerPool) submit(task Task) {
	p.mu.Lock()
	defer p.mu.Unlock()

	// 前提是队列还没有关闭的情况下,提交任务
	if !p.isStop {
		p.taskQueue <- task
	}
}

4、启动池

工作者运行在一个无限循环中,不断从 workerQueue 中取出任务并执行。

  • 使用 for 循环和 select 语句监听 workerQueue 中的任务。
  • 执行任务,然后等待下一个任务。
  • 如果接收到任务队列关闭或 stopSignal,工作者退出循环。
// worker执行任务
func (p *WorkerPool) worker(num int) {
	defer p.waitGroup.Done()
	for {
		select {
		case <-p.stopSignal:
			// 接受到停止信号
			fmt.Printf("【%d号】接受到停止讯号,结束!\n", num)
			return
		case task, ok := <-p.taskQueue:
			if ok {
				// 具体的业务逻辑
				process(task, num)
			} else {
				// 任务队列被关闭了,表示没有任务了
				fmt.Printf("【%d号】完成!\n", num)
				return
			}
		default:
			// 暂时没有任务,睡眠10毫秒
			fmt.Println("!空闲!没有任务")
			time.Sleep(time.Millisecond * 100)
		}
	}
}

5、停止

实现一个 Stop 方法,用于优雅地关闭 Worker Pool。

  • 发送停止信号给所有工作者,让它们结束循环。
  • 使用 WaitGroup 等待所有工作者完成当前任务。
  • 关闭任务队列和工作者队列。
func (p *WorkerPool) stop() {
	p.mu.Lock()
	defer p.mu.Unlock()
	// 发送停止讯号
	for i := 0; i < p.maxWorkerNums; i++ {
		p.stopSignal <- 1
	}

	if !p.isStop { // 确保只关闭一次。
		p.isStop = true
		close(p.taskQueue)
	}

	p.waitGroup.Wait()
	close(p.stopSignal)
}

6、具体任务和业务逻辑

// Task 具体需要执行单元任务
type Task interface{}

func process(t Task, num int) {
	fmt.Printf("...【%d号】正在处理任务:%v\n", num, t)
	// 模拟耗时操作
	time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
	fmt.Printf("✓完成任务:%v\n", t)
}

7、启动测试程序

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	// 初始化一个工作者池
	pool := NewWorkerPool(5)
	// 初始化并开始工作
	pool.start()

	// 模拟发送1000个任务
	wg := sync.WaitGroup{}
	wg.Add(1)
	go func() {
		for i := 0; i < 1000; i++ {
			pool.submit(fmt.Sprintf("任务%d", i))
		}
	}()
	wg.Done()
	wg.Wait()

	// 模拟程序运行中
	time.Sleep(2 * time.Second)

	// 强制停止程序
	pool.stop()

	fmt.Println("main结束")
}

输出:

!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
...【1号】正在处理任务:任务0
!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
...【3号】正在处理任务:任务1
...【0号】正在处理任务:任务2
!空闲!没有任务
...【4号】正在处理任务:任务3
✓完成任务:任务3
...【4号】正在处理任务:任务4
...【2号】正在处理任务:任务5
✓完成任务:任务4
...【4号】正在处理任务:任务6
✓完成任务:任务6
...【4号】正在处理任务:任务7
✓完成任务:任务2
...【0号】正在处理任务:任务8
✓完成任务:任务5
...【2号】正在处理任务:任务9
✓完成任务:任务0
...【1号】正在处理任务:任务10
✓完成任务:任务7
...【4号】正在处理任务:任务11
✓完成任务:任务1
...【3号】正在处理任务:任务12
✓完成任务:任务8
...【0号】正在处理任务:任务13
✓完成任务:任务11
...【4号】正在处理任务:任务14
✓完成任务:任务10
...【1号】正在处理任务:任务15
✓完成任务:任务12
...【3号】正在处理任务:任务16
✓完成任务:任务13
...【0号】正在处理任务:任务17
✓完成任务:任务14
...【4号】正在处理任务:任务18
✓完成任务:任务17
...【0号】正在处理任务:任务19
✓完成任务:任务9
...【2号】正在处理任务:任务20
✓完成任务:任务15
...【1号】正在处理任务:任务21
✓完成任务:任务18
...【4号】正在处理任务:任务22
✓完成任务:任务21
...【1号】正在处理任务:任务23
✓完成任务:任务22
...【4号】正在处理任务:任务24
✓完成任务:任务24
...【4号】正在处理任务:任务25
✓完成任务:任务16
...【3号】正在处理任务:任务26
✓完成任务:任务23
...【1号】正在处理任务:任务27
✓完成任务:任务20
...【2号】正在处理任务:任务28
✓完成任务:任务26
【3号】完成!
✓完成任务:任务19
【0号】完成!
✓完成任务:任务27
【1号】完成!
✓完成任务:任务25
【4号】接受到停止讯号,结束!
✓完成任务:任务28
【2号】接受到停止讯号,结束!
main结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2249576.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用docker搭建hysteria2服务端

源链接&#xff1a;https://github.com/apernet/hysteria/discussions/1248 官网地址&#xff1a;https://v2.hysteria.network/zh/docs/getting-started/Installation/ 首选需要安装docker和docker compose 切换到合适的目录 cd /home创建文件夹 mkdir hysteria创建docke…

基于Java实现的潜艇大战游戏

基于Java实现的潜艇大战游戏 一.需求分析 1.1 设计任务 本次游戏课程设计小组成员团队合作的方式&#xff0c;通过游戏总体分析设计&#xff0c;场景画面的绘制&#xff0c;游戏事件的处理&#xff0c;游戏核心算法的分析实现&#xff0c;游戏的碰撞检测&#xff0c;游戏的反…

课题组自主发展了哪些CMAQ模式预报相关的改进技术?

空气污染问题日益受到各级政府以及社会公众的高度重视&#xff0c;从实时的数据监测公布到空气质量数值预报及预报产品的发布&#xff0c;我国在空气质量监测和预报方面取得了一定进展。随着计算机技术的高速发展、空气污染监测手段的提高和人们对大气物理化学过程认识的深入&a…

深入解析下oracle date底层存储方式

之前我们介绍了varchar2和char的数据库底层存储格式&#xff0c;今天我们介绍下date类型的数据存储格式&#xff0c;并通过测试程序快速获取一个日期。 一、环境搭建 1.1&#xff0c;创建表 我们还是创建一个测试表t_code&#xff0c;并插入数据&#xff1a; 1.2&#xff0c;…

【论文复现】SRGAN

1. 项目结构 如何生成文件夹的文件目录呢? 按住shift键,右击你要生成目录的文件夹,选择“在此处打开Powershell窗口” 在命令窗口里输入命令“tree”,按回车。就会显示出目录结构。 ├─.idea │ └─inspectionProfiles ├─benchmark_results ├─data │ ├─test …

Kubernetes 之 Ingress 和 Service 的异同点

1. 概念与作用 1.1 Ingress Ingress 是什么&#xff1f; Ingress主要负责七层负载&#xff0c;将外部 HTTP/HTTPS 请求路由到集群内部的服务。它可以基于域名和路径定义规则&#xff0c;从而将外部请求分配到不同的服务。 ingress作用 提供 基于 HTTP/HTTPS 的路由。 支持 …

结构体详解+代码展示

系列文章目录 &#x1f388; &#x1f388; 我的CSDN主页:OTWOL的主页&#xff0c;欢迎&#xff01;&#xff01;&#xff01;&#x1f44b;&#x1f3fc;&#x1f44b;&#x1f3fc; &#x1f389;&#x1f389;我的C语言初阶合集&#xff1a;C语言初阶合集&#xff0c;希望能…

Springboot项目搭建(7)

1.概要 2.Layout主页布局 文件地址&#xff1a;src\views\Layout.vue 2.1 script行为模块 从elementUI中选取图标图案。 <script setup> import {Management,Promotion,UserFilled,User,Crop,EditPen,SwitchButton,CaretBottom } from "element-plus/icons-vue…

cocos creator 3.8 俄罗斯方块Demo 10

这里的表格是横行数列&#xff0c;也就是x是行&#xff0c;y是列&#xff0c;不要当x/y轴看。 1-1012-1012-1-1[-1,0]0[0,-1][0,0][0,1][0,2]0[0,0]11[1,0]22[2,0] -1012-1012-1-1[-1,0]0[0,-1][0,0][0,1][0,2]0[0,0]11[1,0]22[2,0] 2-1012-1012-1[-1,-1][-1,0]-1[-1,-1][-1…

Java安全—原生反序列化重写方法链条分析触发类

前言 在Java安全中反序列化是一个非常重要点&#xff0c;有原生态的反序列化&#xff0c;还有一些特定漏洞情况下的。今天主要讲一下原生态的反序列化&#xff0c;这部分内容对于没Java基础的来说可能有点难&#xff0c;包括我。 序列化与反序列化 序列化&#xff1a;将内存…

【Java 学习】面向程序的三大特性:封装、继承、多态

引言 1. 封装1.1 什么是封装呢&#xff1f;1.2 访问限定符1.3 使用封装 2. 继承2.1 为什么要有继承&#xff1f;2.2 继承的概念2.3 继承的语法2.4 访问父类成员2.4.1 子类中访问父类成员的变量2.4.2 访问父类的成员方法 2.5 super关键字2.6 子类的构造方法 3. 多态3.1 多态的概…

LeetCode—74. 搜索二维矩阵(中等)

仅供个人学习使用 题目描述&#xff1a; 给你一个满足下述两条属性的 m x n 整数矩阵&#xff1a; 每行中的整数从左到右按非严格递增顺序排列。 每行的第一个整数大于前一行的最后一个整数。 给你一个整数 target &#xff0c;如果 target 在矩阵中&#xff0c;返回 true…

uniapp关闭sourceMap的生成,提高编译、生产打包速度

警告信息&#xff1a;[警告⚠] packageF\components\mpvue-echarts\echarts.min.js 文件体积超过 500KB&#xff0c;已跳过压缩以及 ES6 转 ES5 的处理&#xff0c;手机端使用过大的js库影响性能。 遇到问题&#xff1a;由于微信小程序引入了mpvue-echarts\echarts.min.js&…

房屋出租出售预约系统支持微信小程序+H5+APP

核心功能有&#xff1a;新盘销售、房屋租赁、地图找房、小区找房&#xff0c;地铁找房等方式。 地图找房&#xff1a;通过地图标注查看附近房源&#xff0c;方便用户根据地理位置查找合适的房产。二手房资讯&#xff1a;提供租房及二手房市场的相关资讯&#xff0c;帮助用户了…

Axure农业农村数据可视化大屏模板分享

在当今信息技术飞速发展的时代&#xff0c;数据可视化已成为各行各业提升管理效率、优化决策过程的重要手段。Axure作为一款强大的原型设计工具&#xff0c;凭借其高度的自定义能力和丰富的交互设计功能&#xff0c;在农业农村数据可视化领域展现出强大的潜力。本文将详细介绍A…

【分享】PPT打开密码的设置与移除方法

设置和取消PPT的打开密码&#xff0c;是保障和移除文件访问权限的重要操作。以下将分别讲解如何为PPT设置密码&#xff0c;以及如何取消打开密码。 一、如何设置PPT打开密码 方法1&#xff1a; 1、在PowerPoint中打开目标文件&#xff0c;然后点击左上角的“文件”选项。在弹…

网络基础 - 地址篇

一、IP 地址 IP 协议有两个版本&#xff0c;IPv4 和 IPv6IP 地址(IPv4 地址)是一个 4 字节&#xff0c;32 位的正整数&#xff0c;通常使用 “点分十进制” 的字符串进行表示&#xff0c;例如 192.168.0.1&#xff0c;用点分割的每一个数字表示一个字节&#xff0c;范围是 0 ~…

在 Ubuntu 上部署 MediaWiki 开源维基平台

MediaWiki 是一个功能强大的开源维基软件&#xff0c;全球众多组织使用它来创建协作文档网站。本文将指导你如何在 Ubuntu 服务器上安装 MediaWiki&#xff0c;使用 Nginx 作为 Web 服务器&#xff0c;PostgreSQL 作为数据库管理系统。 简介 MediaWiki 是一个灵活且强大的维基…

Spring Boot整合EasyExcel

文章目录 EasyExcel简介Spring Boot整合EasyExcel一、单sheet写操作二、多sheet写数据三、读操作 EasyExcel简介 1、EasyExcel 是一个基于 Java 的简单、省内存的读写 Excel 的开源项目。在尽可能节约内存的情况下支持读写百 M 的 Excel&#xff08;没有一次性将数据读取到内存…

Java 基础面试题

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…