go 协程池的实现

news2024/11/19 14:50:00

使用场景

这次需求是做一个临时的数据采集功能,为了将积压的数据快速的消耗完,但是单一的脚本消耗的太慢,于是乎就手写了一个简单的协程池:

  1. 为了能加快数据的收集速度
  2. 为了稳定协程的数量,让脚本变得稳定

设计图如下

在这里插入图片描述
协程池中提供了三个方法:

  1. 一个是Addjob用来将任务加入到任务池中
  2. Do 是用来消耗任务池中的任务
  3. HandleErrors 用来获取到错误信息
  4. Stop 是当脚本停止以后,不会立刻停止而是等待所有的人物消耗光在停止

代码如下

该协程池是借用了go扩展库中的semaphore来实现的。

  1. semaphore 信号量是一种同步机制,用于控制对共享资源的访问,常用于限制可以同时访问某一资源或资源池的线程数量。
  2. 我使用的是Acquire函数来实现的,Acquire 当资源访问量达到上限时会被阻塞,直到有协程执行完成,所以我们这里需要对Acquire的上下文设置超时时间,防止我们的任务出现死任务无法退出,从而导致整个协程池堵死。
  3. 我们在任务执行完成后要通过Release来释放资源,防止我们池子越变越小。
package pool

import (
	"context"
	"sync"
	"time"

	"golang.org/x/sync/semaphore"
)

type GoPool struct {
	MaxNum int
	Jobs   chan func() error
	sem    *semaphore.Weighted
	wg     *sync.WaitGroup
	Errs   chan error
}

func NewGoPool(num int) *GoPool {
	return &GoPool{
		MaxNum: num,
		Jobs:   make(chan func() error, num),
		sem:    semaphore.NewWeighted(int64(num)),
		wg:     &sync.WaitGroup{},
		Errs:   make(chan error, num),
	}
}

func (g *GoPool) Do() {

	go g.gAcquire()
}

func (g *GoPool) AddJob(f func() error) {
	g.Jobs <- f
}

func (g *GoPool) gAcquire() {

	for {
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		select {
		case job, ok := <-g.Jobs:
			if !ok {
				cancel()
				return
			}
			g.wg.Add(1)
			if err := g.sem.Acquire(ctx, 1); err != nil {
				// g.Errs <- err
				g.wg.Done()
				cancel() // 确保在退出前取消context
				break
			}
			go func() {
				defer g.sem.Release(1)
				defer g.wg.Done()
				if err := job(); err != nil {
					g.Errs <- err
					return
				}
			}()
		case <-ctx.Done():
			return
		default:
			continue
		}
	}
}

func (g *GoPool) Stop() {
	close(g.Jobs)
	g.wg.Wait()
	close(g.Errs)
}


func (g *GoPool) HandleErrors(handler func(error)) {
	for err := range g.Errs {
		handler(err)
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1955166.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JUC】线程局部变量ThreadLocal

文章目录 ThreadLocal简介面试题是什么&#xff1f;能干吗&#xff1f;API介绍initialValue方法&#xff08;不推荐&#xff09;withInitial方法&#xff08;推荐&#xff09;remove ThreadLocal入门案例原始需求需求变更线程池 总结 ThreadLocal源码分析Thread、ThreadLocal、…

区块链——hardhat使用

一、引入hardhat yarn add --dev hardhat // 引入验证合约的插件 yarn add --dev nomicfoundation/hardhat-verify 二、创建hardhat项目 yarn hardhat 三、编写我们的合约 四、编译我们的合约 yarn hardhat compile 五、编写脚本部署合约以及验证合约 // 获取hardhat环境对象 c…

Flutter大型项目架构:私有组件包管理

随着项目功能模块越来越多&#xff0c;怎么去管理这些私有组件包是一个不得不面对的问题&#xff0c;特别对于团队开发来讲&#xff0c;一些通用的公共组件往往会在多个项目间使用&#xff0c;多的有几十个&#xff0c;每个组件包都有有自己的版本&#xff0c;组件包之间还有依…

AI的欺骗游戏:揭示多模态大型语言模型的易受骗性

人工智能咨询培训老师叶梓 转载标明出处 多模态大型语言模型&#xff08;MLLMs&#xff09;在处理包含欺骗性信息的提示时容易生成幻觉式响应。尤其是在生成长响应时&#xff0c;仍然是一个未被充分研究的问题。来自 Apple 公司的研究团队提出了MAD-Bench&#xff0c;一个包含8…

漏洞复现-F6-11泛微-E-Cology-SQL

本文来自无问社区&#xff0c;更多漏洞信息可前往查看http://www.wwlib.cn/index.php/artread/artid/15575.html 0x01 产品简介 泛微协同管理应用平台e-cology是一套企业级大型协同管理平台 0x02 漏洞概述 该漏洞是由于泛微e-cology未对用户的输入进行有效的过滤&#xff0…

CentOS_7.9历史版本官网下载

文章目录 1. 官网下载1.1. 打开官网1.2. download1.3. Older Versions-click here1.4. RPMs1.5. Vault mirror1.6. 7.9.2009/1.7. isos1.8. x86_641.9. 选择安装包下载 2. 阿里云镜像站下载 1. 官网下载 下载地址(如果下载地址已失效&#xff0c;参考下面的步骤) iso是安装包…

【前端】一文带你了解 CSS

文章目录 1. CSS 是什么2. CSS 引入方式2.1 内部样式2.2 外部样式2.3 内联样式 3. CSS 常见选择器3.1 基础选择器3.1.1 标签选择器3.1.2 类选择器3.1.3 id 选择器3.1.4 通配符选择器 3.2 复合选择器3.2.1 后代选择器 4. CSS 常用属性4.1 字体相关4.2 文本相关4.3 背景相关4.4 设…

基于sklearn的机器学习 — 决策树与随机森林

基于树的学习算法是一种广泛而流行的非参数、有监督的分类和回归方法。 基于树的学习算法的基础是决策树&#xff08;decision tree&#xff09;&#xff0c;它将一系列决策规则串联起来&#xff0c;看起来像一棵倒立的树&#xff0c;第一条决策规则位于树顶&#xff0c;称之为…

第十四篇——军争篇:怎样在行军中设计战场

目录 一、背景介绍二、思路&方案三、过程1.思维导图2.文章中经典的句子理解3.学习之后对于投资市场的理解4.通过这篇文章结合我知道的东西我能想到什么&#xff1f; 四、总结五、升华 一、背景介绍 通过不利的战场&#xff0c;用方式方法&#xff0c;让战场逐渐转化成对自…

[JS]同事:这次就算了,下班回去赶紧补补内置函数,再犯肯定被主管骂

【版权声明】未经博主同意&#xff0c;谢绝转载&#xff01;&#xff08;请尊重原创&#xff0c;博主保留追究权&#xff09; https://blog.csdn.net/m0_69908381/article/details/140754278 出自【进步*于辰的博客】 参考笔记一&#xff0c;P10.4、P13.2&#xff1b;笔记三&am…

城市交通工具目标检测数据集自行车、公交车、小汽车、行人

数据整理不易&#xff0c;下载地址点这里&#xff1b; yolo格式数据集之交通工具检测5种&#xff1b; 数据集已划分好|可以直接使用|yolov5|v6|v7|v8|v9|v10通用&#xff1b; 本数据为交通工具检测检测数据集&#xff0c;数据集数量如下&#xff1a; 总共有:6633张 训练集&…

尚庭公寓(四)

房间基本属性管理共有五个接口&#xff0c;分别是**保存或更新属性名称**、**保存或更新属性值**、**查询全部属性名称和属性值列表**、**根据ID删除属性名称**、**根据ID删除属性值**。下面逐一是实现。 首先在AttrController中注入AttrKeyService和AttrValueService&#xf…

计数器与阻塞队列

目录 一&#xff1a;阻塞队列 模拟阻塞队列 二&#xff1a;线程池&#xff1a; 三&#xff1a;计数器&#xff1a; 定时器模拟实现 一&#xff1a;阻塞队列 阻塞队列是在原有的普通队列上做了扩充&#xff0c;标准库中原有的队列和子类都是线程不安全的。 1.线程安全 2.…

OSMDroidOfflineDemo源码调试记录

文章目录 源码下载环境配置尝试不同离线加载遇到的问题 尝试安卓端加载离线地图&#xff0c;下载了使用osmdroid的离线版项目源码&#xff0c;更改JDK环境、gradle环境&#xff0c;一顿操作下来&#xff0c;踉踉跄跄的把程序跑起来了&#xff0c;但是离线的地图一直加载不出来。…

49.TFT_LCD液晶屏驱动设计与验证(2)

&#xff08;1&#xff09;Visio视图&#xff1a; &#xff08;2&#xff09;控制模块Verilog代码&#xff1a; module tft_ctrl(input clk_33M ,input reset_n ,input [23:0] data_in ,output [9:0] hang…

如何使用 SQLite ?

SQLite 是一个轻量级、嵌入式的关系型数据库管理系统&#xff08;RDBMS&#xff09;。它是一种 C 库&#xff0c;实现了自给自足、无服务器、零配置、事务性 SQL 数据库引擎。SQLite 的源代码是开放的&#xff0c;完全在公共领域。它被广泛用于各种应用程序&#xff0c;包括浏览…

Python | ValueError: could not convert string to float: ‘example’

Python | ValueError: could not convert string to float: ‘example’ 在Python编程中&#xff0c;类型转换是一个常见的操作。然而&#xff0c;当尝试将一个字符串转换为浮点数时&#xff0c;如果字符串的内容不是有效的浮点数表示&#xff0c;就会遇到“ValueError: could…

【python_将一个列表中的几个字典改成二维列表,并删除不需要的列】

def 将一个列表中的几个字典改成二维列表(original_list,headersToRemove_list):# 初始化一个列表用于存储遇到的键&#xff0c;保持顺序ordered_keys []# 遍历data中的每个字典&#xff0c;添加其键到ordered_keys&#xff0c;如果该键还未被添加for d in original_list:for …

SpringCloud之@FeignClient()注解的使用方式

FeignClient介绍 FeignClient 是 Spring Cloud 中用于声明一个 Feign 客户端的注解。由于SpringCloud采用分布式微服务架构&#xff0c;难免在各个子模块下存在模块方法互相调用的情况。比如订单服务要调用库存服务的方法&#xff0c;FeignClient()注解就是为了解决这个问题的…

Vim 文本编辑工具

Vim 基础命令 一、Vim 命令速查 Vim 是一款功能强大的文本编辑器&#xff0c;广泛应用于Linux系统中。以下是一些基础但非常有用的Vim命令&#xff0c;它们将帮助你更高效地使用Vim。 使用单个字母键通常需要进一步的输入以形成完整命令。特殊符号用来表示操作的位置。 命令…