Golang协程池ants库的学习、使用及源码阅读,协程池与GMP模型关系的理解

news2025/1/10 21:07:58

前言

在工作时遇到了一个需要使用ants协程池的地方,因此顺带来学习一下他的原理。

在这里插入图片描述

协程池

Golang的资源还是偏少一些…因此先简单的参考学习了一下线程池。

类似于Java中的线程池,协程池也是为了减少协程频繁创建、销毁所带来资源消耗的问题。按默认每个goroutine 8kb内存来算,几十万个goroutine就会占满8Gb内存。同时,由于goroutine的结束需要等待自身运行结束才可以销毁,所以也可能出现goroutine泄露的问题。因此需要使用协程池,来进行管理。

本文是ants协程池的学习,其他优秀的协程池日后有机会再去学习。

ants库对性能的提升

直接引用作者的结论“用ants的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。”

在goroutine越多时候,提升越明显。具体请参阅作者文档中给出的性能测试部分。——https://github.com/panjf2000/ants/blob/dev/README_ZH.md

ants库工作流程

这个是作者在文档中所给出的流程图。在原文档中还提供了动态图,但我的电脑上他们没动…所以我也就不搬运了。具体还是请参阅作者文档。

在这里插入图片描述

当一个任务被丢尽协程池后,大致分为以下几步:
1、判断是否有可用的worker。若有,分配任务到worker,并执行即可。
2、若无可用的worker,判断是否达到容量上限。若未达到上限,新增一个worker并分配执行即可。
3、若已达到上限,则判断是否允许阻塞。若不允许,则直接返回nil即可。
4、若允许阻塞,则阻塞至有可用worker,再分配执行即可。
5、对于一个执行完成任务的worker,再放回工作池。

ants库源码对照阅读

由于笔者学术不精且文笔欠佳,因此此部分仅是简单阅读记录,一篇详细且高质量的源码阅读请参阅Go 每日一库之 ants(源码赏析)。此文阅读、解析的十分详细。

新建Pool

Pool所用到的的数据结构如下。

type Pool struct {
	// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
	// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
	// which submits a new task to the same pool.
	capacity int32

	// running is the number of the currently running goroutines.
	running int32

	// lock for protecting the worker queue.
	lock sync.Locker

	// workers is a slice that store the available workers.
	workers workerQueue

	// state is used to notice the pool to closed itself.
	state int32

	// cond for waiting to get an idle worker.
	cond *sync.Cond

	// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
	workerCache sync.Pool

	// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
	waiting int32

	purgeDone int32
	stopPurge context.CancelFunc

	ticktockDone int32
	stopTicktock context.CancelFunc

	now atomic.Value

	options *Options
}

其中大致如下几个参数:
capacity:池容量,表示ants最多能创建的 goroutine 数量。负值意味着池的容量是无限的,使用无限池是为了避免由于嵌套使用池而导致的无限阻塞的潜在问题:提交一个任务到池,该池将一个新任务提交到相同的池。
running:是当前正在运行的goroutines的数量。
workers:存储可用worker的切片。works是一个workerQueue类型的接口,位于workerQueue.go这个文件中。
state:记录池子当前是否已关闭(CLOSED)。
waiting:已经被pool.Submit()阻塞的协程数量,由pool.lock保护。
lock:锁。
cond:条件变量。处理任务等待和唤醒。
blockingNum:阻塞等待的任务数量。
其他具体的见源码中

// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {
	if size <= 0 {
		size = -1
	}

	opts := loadOptions(options...)

	if !opts.DisablePurge {
		if expiry := opts.ExpiryDuration; expiry < 0 {
			return nil, ErrInvalidPoolExpiry
		} else if expiry == 0 {
			opts.ExpiryDuration = DefaultCleanIntervalTime
		}
	}

	if opts.Logger == nil {
		opts.Logger = defaultLogger
	}

	p := &Pool{
		capacity: int32(size),
		lock:     syncx.NewSpinLock(),
		options:  opts,
	}
	p.workerCache.New = func() interface{} {
		return &goWorker{
			pool: p,
			task: make(chan func(), workerChanCap),
		}
	}
	if p.options.PreAlloc {
		if size == -1 {
			return nil, ErrInvalidPreAllocSize
		}
		p.workers = newWorkerQueue(queueTypeLoopQueue, size)
	} else {
		p.workers = newWorkerQueue(queueTypeStack, 0)
	}

	p.cond = sync.NewCond(p.lock)

	p.goPurge()
	p.goTicktock()

	return p, nil
}

NewPool就是新建一个Pool,其接收的参数是(size int, options ...Option) ,第一个是容量,其他可选项。

通过Submit进行任务提交

此部分源码如下

func (p *Pool) Submit(task func()) error {
	if p.IsClosed() {
		return ErrPoolClosed
	}

	w, err := p.retrieveWorker()
	if w != nil {
		w.inputFunc(task)
	}
	return err
}

首先判断池子是否关闭,关闭则直接err
接下来通过retrieveWorker来判断是否有空闲的worker
w!=nil 即有空闲worker,则通过w.inputFunc(task)新增一个任务。

w.inputFunc(task)部分如下

func (w *goWorker) inputFunc(fn func()) {
	w.task <- fn
}

是把任务提交到了一个goWorker结构体中的task chan func()。这个goworkerworker接口的实现,他其中chan的任务会进行run

通过retrieveWorker判断空闲worker

此部分源码如下,其所返回的是一个可用的worker


// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w worker, err error) {
	p.lock.Lock()

retry:
	// First try to fetch the worker from the queue.
	if w = p.workers.detach(); w != nil {
		p.lock.Unlock()
		return
	}

	// If the worker queue is empty, and we don't run out of the pool capacity,
	// then just spawn a new worker goroutine.
	if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
		p.lock.Unlock()
		w = p.workerCache.Get().(*goWorker)
		w.run()
		return
	}

	// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
	if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
		p.lock.Unlock()
		return nil, ErrPoolOverload
	}

	// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
	p.addWaiting(1)
	p.cond.Wait() // block and wait for an available worker
	p.addWaiting(-1)

	if p.IsClosed() {
		p.lock.Unlock()
		return nil, ErrPoolClosed
	}

	goto retry
}

首先通过p.workers.detach()判断是否有空闲的worker。若有,则直接返回可用的worker

若无,通过

if capacity := p.Cap(); capacity == -1 || capacity > p.Running() 

这一个判断来判断是否耗尽了池容量。若没有耗尽,则通过p.workerCache.Get().(*goWorker)生成一个新的工作goroutine,并返回workers

继续执行则是“没有空闲worker且池容量已满”的情况。便通过

	if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) 

判断,是否处于非阻塞模式,或者待处理的呼叫者数量达到最大值,若是,则退出返回nil

反之,通过

	p.addWaiting(1)
	p.cond.Wait() // block and wait for an available worker
	p.addWaiting(-1)

进行阻塞等待可用的即可。

此部分整体通过一个retry:标签和goto retry来进行控制,通过一个lock保证了安全。具体看作者源码部分,此处不搬运描述。

run部分

chan中的任务会依次去runrun的源码部分如下:


// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *goWorker) run() {
	w.pool.addRunning(1)
	go func() {
		defer func() {
			w.pool.addRunning(-1)
			w.pool.workerCache.Put(w)
			if p := recover(); p != nil {
				if ph := w.pool.options.PanicHandler; ph != nil {
					ph(p)
				} else {
					w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())
				}
			}
			// Call Signal() here in case there are goroutines waiting for available workers.
			w.pool.cond.Signal()
		}()

		for f := range w.task {
			if f == nil {
				return
			}
			f()
			if ok := w.pool.revertWorker(w); !ok {
				return
			}
		}
	}()
}

简单来说,这部分是开协程,并在其中:增减相关计数、运行函数。相当于是把goroutine又交给了底层GMP模型去进行处理。

ants库的使用

根据作者文档的描述,主要提供了下述几种功能“任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool”。
下面是我一次使用ants库记录。

https://blog.csdn.net/Ws_Te47/article/details/135484767

此次使用只是用到了最基础的功能,新建Pool、任务提交、释放Pool。
一个更详细的使用demo请见——https://darjun.github.io/2021/06/03/godailylib/ants/

关于GMP模型和协程池

在昨天的初步学习中,曾出现过一个疑惑,GMP模型和协程池,到底是什么关联?我疑惑在他们都是用于协程的分配与控制。协程池通过一系列的判断来判断是否运行,但GMP不也有队列去分配吗?

今天突然恍然大悟,在学习完源码后也进一步清晰了他。

GMP模型是用于协程分配给线程这一个过程的,他的起点是“一个新建好的协程”,并将这个协程去进行一系列的排队、抢占调度等 操作,最终终点是分配到线程,并最终经CPU进行执行。

但协程池的主要目的是goroutine的复用,减少因频繁新建、销毁goroutine所带来的性能损耗。他的起点是“一个需要通过goroutine执行的方法”,通过一系列的判断、分配,最终的终点是在run方法中去启用一个goroutine。

所以可以说,协程池的终点是GMP的起点。一个任务先通过协程池去挑选合适的时机,分配到对应的goroutine;再通过GMP模型,去分配给对应的线程,最终分配到CPU进行执行。

参考资源

官方文档,唯一真神…

Java线程池实现原理及其在美团业务中的实践。

Go 每日一库之 ants

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1371981.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【我想开发一个小程序,大概需要多少钱?】

小程序开发为什么报价差距很大&#xff1f;主要是因为小程序的实现方法和功能模型不同。 小程序的实现方法&#xff1a; 实现方法主要分为SAAS小程序、定制小程序和第三方平台小程序。不同的实现方法价格都是不一样的&#xff0c;大概的区间如下&#xff1a; SAAS小程序和第三…

ChatGPT+Python近红外光谱数据分析及机器学习与深度学习建模进阶应用

目录 第一章 ChatGPT4入门基础 第二章 ChatGPT4 提示词使用方法与技巧 第三章 ChatGPT4助力信息检索与总结分析 第四章 ChatGPT4助力论文写作与投稿 第五章 ChatGPT4助力Python入门基础 第六章 ChatGPT4助力近红外光谱数据预处理 第七章 ChatGPT4助力多元线性回归近红外…

leetcode经典【双指针】例题

删除有序数组中的重复项&#xff1a; https://leetcode.cn/problems/remove-duplicates-from-sorted-array/ 解题思路&#xff1a; 首先注意数组是有序的&#xff0c;那么重复的元素一定会相邻。 注: 要求删除重复元素&#xff0c;实际上就是将不重复的元素移到数组的左侧。 考…

ubuntu20.04安装cuda11.4以及cudnn

系统&#xff1a;ubuntu20.04硬件配置&#xff1a;GPU3080、CPU未知通过《软件和更新》在附加驱动选项中添加了驱动&#xff1a; 1.检查自己电脑支持的cuda nvidia-smi4. 下载cuda11.4.2 wget https://developer.download.nvidia.com/compute/cuda/11.4.2/local_installers/c…

典型的无人机打击技术

无人机打击技术主要指的是用于中和、摧毁或干扰无人机&#xff08;UAV&#xff09;的各种技术手段。 这些技术随着无人机的广泛使用而迅速发展&#xff0c;特别是在军事和安全领域。下面是一些主要的无人机打击技术及其原理&#xff1a; 射频干扰&#xff08;RF Jamming&#x…

算法通关村番外篇-LeetCode编程从0到1系列六

大家好我是苏麟 , 今天带来LeetCode编程从0到1系列六 . 链表相关的题目 , 也是面试热题 . 大纲 21. 合并两个有序链表206. 反转链表 21. 合并两个有序链表 描述 : 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 题目 : Le…

苹果手机设置静态IP教程,长效IP代理有什么帮助?

随着智能手机的普及&#xff0c;越来越多的人开始使用苹果手机。然而&#xff0c;有时候我们在使用苹果手机时&#xff0c;可能会遇到需要设置静态IP的情况。那么&#xff0c;如何在苹果手机上设置静态IP呢&#xff1f;下面就为大家详细介绍一下。 1、打开苹果手机&#xff0c;…

LINUX基础第十一章:文件系统与日志服务管理

目录 一.LINUX文件系统 1.inode表和block &#xff08;1&#xff09;inode &#xff08;2&#xff09;block 2.查看inode号命令 3.Linux系统文件三种主要时间属性 4.磁盘空间还剩余很多但无法继续创建文件 5.inode大小 二.日志 1.日志保存位置 2.日志文件的分类 &am…

关于java的稀疏数组

关于java的稀疏数组 我们在前面的文章中了解了冒泡排序和优化冒泡排序&#xff0c;在本篇文章中我们来介绍一下稀疏数组&#xff0c;我们学会了可以自己动手试一试&#x1f600; 稀疏数组 在介绍稀疏数组之前&#xff0c;我们先来了解一下五子棋。 我们这里有一个11 x 11的棋…

新手练习项目 4:简易2048游戏的实现(C++)

名人说&#xff1a;莫听穿林打叶声&#xff0c;何妨吟啸且徐行。—— 苏轼《定风波莫听穿林打叶声》 Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#xff09; 目录 一、效果图二、代码&#xff08;带注释&#xff09;三、说明 一、效果图 二、代码&#xff08;带…

深度学习|交叉熵

文章目录 什么是交叉熵如何构造信息量的函数关于 C 1 C_1 C1​参数的选择关于 C 2 C_2 C2​参数的选择 一个系统的熵如何比较两个系统的熵交叉熵在神经网络中的应用参考 什么是交叉熵 熵是用来衡量一个系统的混乱程度&#xff0c;混乱程度也其实代表着整个系统内部的不确定性。…

Redis缓存使用问题

数据一致性 只要使用到缓存,无论是本地内存做缓存还是使用 redis 做缓存,那么就会存在数据同步的问题。 以 Tomcat 向 MySQL 中写入和删改数据为例,来解释数据的增删改操作具体是如何进行的。 我们分析一下几种解决方案, 1、先更新缓存,再更新数据库 2、先更新数据库,…

搭建Docker私有镜像服务器

一、前言 1、本文主要内容 基于Decker Desktop&Docker Registry构建Docker私有镜像服务器测试在CentOS 7上基于Docker Registry搭建公共Docker镜像服务器修改Docker Engine配置以HTTP协议访问Docker Registry修改Docker Engine配置通过域名访问Docker Registry配置SSL证书…

1分钟速通Webservice服务端和客户端

服务端实现&#xff1a; 我们随便实现一个简单服务&#xff0c;客户请求我们的服务&#xff0c;我们给客户返回响应的信息 WebService public class HelloServiceImpl implements HelloService {Overridepublic String getString(String name) {return "hello," n…

使用numpy处理图片——基础操作

大纲 准备工作图片像素大小修改透明度 numpy是一款非常优秀的处理多维数组的Python基础包。在现实中&#xff0c;我们最经常接触的多维数组相关的场景就是图像处理。本系列将通过若干篇对图像处理相关的探讨&#xff0c;来介绍numpy的使用方法&#xff0c;以获得直观的体验。 本…

主食冻干哪款好?十大放心主食冻干名单推荐

作为养猫的人&#xff0c;我们都知道每天最担心的事情就是如何为心爱的猫咪选择一款高品质的猫粮。我们都希望为猫咪提供最好的营养&#xff0c;让它们健康快乐地成长。然而&#xff0c;近期的一些事件&#xff0c;如百利猫粮生虫和VE主食冻干掰开有虫&#xff0c;让我们不得不…

【开源商城推荐-LGPL-3.0】ts-mall 聚惠星商城

dts-shop: 聚惠星商城 DTS-SHOP&#xff0c;基于 微信小程序 springboot vue 技术构建 &#xff0c;支持单店铺&#xff0c;多店铺入驻的商城平台。项目包含 微信小程序&#xff0c;管理后台。基于java后台语言&#xff0c;已功能闭环&#xff0c;且达到商用标准的一套项目体…

AI文本朗读应用(二)

调用api实现TTS 注&#xff1a;如对api的使用有任何疑问&#xff0c;可以查阅文本转语音 REST API。 选择右侧“解决方案资源管理器”中的“TTS_Demo”&#xff0c;右键选择“添加”->“新建项”。 选择“类”&#xff0c;名称为“Authentication.cs”&#xff0c;点击“添…

【漏洞复现】锐捷EG易网关cli.php后台命令执行漏洞

Nx01 产品简介 锐捷EG易网关是一款综合网关&#xff0c;由锐捷网络完全自主研发。它集成了先进的软硬件体系架构&#xff0c;配备了DPI深入分析引擎、行为分析/管理引擎&#xff0c;可以在保证网络出口高效转发的条件下&#xff0c;提供专业的流控功能、出色的URL过滤以及本地化…

14:00面试,14:07就出来了,问的问题有点变态。。。

前言 刚从小厂出来&#xff0c;没想到在另一家公司我又寄了。 在这家公司上班&#xff0c;每天都要加班&#xff0c;但看在钱给的比较多的份上&#xff0c;也就不太计较了。但万万没想到一纸通知&#xff0c;所有人不准加班了&#xff0c;不仅加班费没有了&#xff0c;薪资还…