Go协程池gopool源码解析

news2025/1/12 11:59:58

1、gopool简介

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines.

It is an alternative to the go keyword.

gopool的用法非常简单,将曾经我们经常使用的go func(){...}替换为gopool.Go(func(){...})即可

此时gopool将会使用默认的配置来管理你启动的协程,也可以选择针对业务场景配置池子大小以及扩容上限

old:

go func() {
	// do your job
}()

new:

gopool.Go(func(){
	// do your job
})

2、核心数据结构

1)、Pool

Pool是一个定义了协程池的接口,代码如下:

// util/gopool/pool.go
type Pool interface {
	// Name returns the corresponding pool name.
	// 协程池的名称
	Name() string
	// SetCap sets the goroutine capacity of the pool.
	// 设置协程池内goroutine的容量
	SetCap(cap int32)
	// Go executes f.
	// 执行f函数
	Go(f func())
	// CtxGo executes f and accepts the context.
	// 带ctx,执行f函数
	CtxGo(ctx context.Context, f func())
	// SetPanicHandler sets the panic handler.
	// 设置发生panic时调用的函数
	SetPanicHandler(f func(context.Context, interface{}))
}

gopool提供了Pool这个接口的默认实现pool,代码如下:

// util/gopool/pool.go
type pool struct {
	// The name of the pool
	// 协程池的名字
	name string

	// capacity of the pool, the maximum number of goroutines that are actually working
	// 协程池实际工作的goroutine的最大数量
	cap int32
	// Configuration information
	// 配置信息
	config *Config
	// linked list of tasks
	// task队列的元信息,每一个task代表一个待执行的函数
	taskHead  *task
	taskTail  *task
	taskLock  sync.Mutex
	taskCount int32

	// Record the number of running workers
	// 当前有多少个worker在运行中,每个worker代表一个goroutine
	workerCount int32

	// This method will be called when the worker panic
	// 协程池中的协程引发的panic会由该函数处理
	panicHandler func(context.Context, interface{})
}

pool数据结构如下图:

2)、task
// util/gopool/pool.go
type task struct {
	// 当前task的ctx
	ctx context.Context
	// 当前task需要执行的函数f
	f func()
	// 指向下一个task的指针
	next *task
}

task是一个链表结构,可以把它理解为一个待执行的任务,包含了当前task需要执行的函数f func()以及指向下一个task的指针

一个协程池pool对应了一组task,pool维护了指向链表的头尾的两个指针:taskHead和taskTail以及链表的长度taskCount和对应的锁taskLock

3)、worker
// util/gopool/worker.go
type worker struct {
	pool *pool
}

一个worker就是逻辑上的一个执行器,它对应到一个协程池pool

当一个worker被唤起,将会开启一个goroutine,不断从pool中的task链表获取任务并执行,代码如下:

// util/gopool/worker.go
func (w *worker) run() {
	go func() {
		for {
			var t *task
			// 操作pool中的task链表前,加锁保证并发安全
			w.pool.taskLock.Lock()
			if w.pool.taskHead != nil {
				// 拿到taskHead准备执行
				t = w.pool.taskHead
				// 更新链表的head以及数量
				w.pool.taskHead = w.pool.taskHead.next
				atomic.AddInt32(&w.pool.taskCount, -1)
			}
			if t == nil {
				// if there's no task to do, exit
				// 如果前一步拿到的taskHead为空,说明无任务需要执行,清理后返回(关闭goroutine)
				w.close()
				w.pool.taskLock.Unlock()
				w.Recycle()
				return
			}
			w.pool.taskLock.Unlock()
			// 执行任务,针对panic会recover,并调用配置的handler
			func() {
				defer func() {
					if r := recover(); r != nil {
						msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
						logger.CtxErrorf(t.ctx, msg)
						if w.pool.panicHandler != nil {
							w.pool.panicHandler(t.ctx, r)
						}
					}
				}()
				t.f()
			}()
			t.Recycle()
		}
	}()
}

3、核心API

来看下使用gopool的核心APIGo(f func()),实现如下:

// util/gopool/gopool.go
func Go(f func()) {
	CtxGo(context.Background(), f)
}

func CtxGo(ctx context.Context, f func()) {
	defaultPool.CtxGo(ctx, f)
}
// util/gopool/pool.go
func (p *pool) CtxGo(ctx context.Context, f func()) {
	// 创建一个task对象,将ctx和待执行的函数赋值
	t := taskPool.Get().(*task)
	t.ctx = ctx
	t.f = f
	// 将task插入pool的链表的尾部,更新链表数量
	p.taskLock.Lock()
	if p.taskHead == nil {
		p.taskHead = t
		p.taskTail = t
	} else {
		p.taskTail.next = t
		p.taskTail = t
	}
	p.taskLock.Unlock()
	atomic.AddInt32(&p.taskCount, 1)
	// The following two conditions are met:
	// 1. the number of tasks is greater than the threshold.
	// 2. The current number of workers is less than the upper limit p.cap.
	// or there are currently no workers.
	// 以下任意条件满足时,创建新的worker并唤起执行
	// 1.待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)
	// 2.无worker运行
	if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
		// worker数量+1
		p.incWorkerCount()
		// 创建一个新的worker,并把当前pool赋值
		w := workerPool.Get().(*worker)
		w.pool = p
		// 唤起worker执行
		w.run()
	}
}

以下任意条件满足时,会扩容worker:

  1. 待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)
  2. 无worker运行

gopool自行维护一个defaultPool,这是一个默认的pool结构体,在引入包的时候就进行初始化。当我们直接调用gopool.Go()时,本质上是调用了defaultPool的同名方法

// util/gopool/gopool.go
var defaultPool Pool

func init() {
	defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
// util/gopool/config.go
const (
	defaultScalaThreshold = 1
)

type Config struct {
	// threshold for scale.
	// new goroutine is created if len(task chan) > ScaleThreshold.
	// defaults to defaultScalaThreshold.
	// 控制扩容的阈值,一旦待执行的task超过此值,且worker数量未达到上限,就开始启动新的worker
	ScaleThreshold int32
}

func NewConfig() *Config {
	c := &Config{
		ScaleThreshold: defaultScalaThreshold,
	}
	return c
}

defaultPool的名称为gopool.DefaultPool,池子容量一万,扩容阈值为1

当调用gopool.Go()时,gopool就会更新维护的任务链表,并且判断是否需要扩容worker:

  • 若此时已经有很多worker启动(底层一个worker对应一个goroutine),不需要扩容,就直接返回
  • 若判断需要扩容,就创建一个新的worker,并调用worker.run()方法启动,各个worker会异步地检查pool里面的任务链表是否还有待执行的任务,如果有就执行

gopool中三个角色的定位:

  • task是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构
  • worker是一个实际执行任务的执行器,它会异步启动一个goroutine执行协程池里面未执行的task
  • pool是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的worker

gopool核心实现原理如下图:

4、使用sync.Pool进行性能优化

gopool中多次使用了sync.Pool来池化对象的创建,复用woker和task对象

task池化:

// util/gopool/pool.go
var taskPool sync.Pool

func init() {
	taskPool.New = newTask
}

func newTask() interface{} {
	return &task{}
}

func (t *task) Recycle() {
	t.zero()
	taskPool.Put(t)
}

worker池化:

// util/gopool/worker.go
var workerPool sync.Pool

func init() {
	workerPool.New = newWorker
}

func newWorker() interface{} {
	return &worker{}
}

func (w *worker) Recycle() {
	w.zero()
	workerPool.Put(w)
}

参考:

解析 Golang 协程池 gopool 设计与实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1570178.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java基础知识总结 | 第十篇】HashSet底层实现原理

文章目录 10.HashSet底层实现原理10.1HashSet特点10.2HashSet源码10.3 add流程10.4总结 10.HashSet底层实现原理 10.1HashSet特点 存储对象&#xff1a;HashSet 存储对象采用哈希表的方式&#xff0c;它不允许重复元素&#xff0c;即集合中不会包含相同的元素。当向 HashSet …

Linux_进程信号_7

文章目录 1.什么是信号2.信号列表3.信号处理常见方式4.信号的存储5.信号产生前-中-后1.信号产生前2.信号产生中 6产生信号1.signal2.kill3.raise4.abort5.alarm6.硬件异常 7.core dump8.信号产生中1. sigset_t&#xff08;数据类型&#xff09;2.信号集操作函数1.sigprocmask2.…

Vue3_2024_7天【回顾上篇watch常见的后两种场景】___续

Vue3中监听多条数据的两种使用 1.watch【使用上一章写法&#xff0c;监听两个属性&#xff0c;然后执行相应操作…】 2.watchEffect【相对于使用watch&#xff0c;watchEffect默认页面初始加载&#xff0c;有点类似加配置&#xff1a;立即执行 immediate】 代码&#xff1a; …

Python:百度AI开放平台——OCR图像文字识别应用

一、注册百度AI开放平台 使用百度AI服务的步骤为&#xff1a; 注册&#xff1a;注册成为百度AI开放平台开发者&#xff1b;创建AI应用&#xff1a;在百度API开放平台上创建相关类型的的AI应用&#xff0c;获得AppID、API Key和Secret Key&#xff1b;调用API&#xff1a;调用…

Word中插入Endnote参考文献时显示乱码

近期在写文章需要插入参考文献&#xff0c;使用Endnote插入时显示乱码&#xff0c;如下图所示&#xff1a; 文章末尾显示{ADDIN EN REFILIST } 解决方法 在网上找了诸多方法尝试也没有解决&#xff0c;最终找到一篇博客介绍了一种方法&#xff1a; word选项—高级&#xff1…

苍穹外卖07(缓存菜品,SpringCache,缓存套餐,添加购物车菜品和套餐多下单,查看购物车,清除购物车,删除购物车中一个商品)

目录 一、缓存菜品 1 问题说明 2 实现思路 3 代码开发&#xff1a;修改DishServiceImpl 4 功能测试 二、SpringCache 1. 介绍 2. 使用语法 1 起步依赖 2 使用要求 3 常用注解 4 SpEL表达式(了解备用) 5 步骤小结 3.入门案例 1 准备环境 2 使用入门 1 引导类上加…

四、MySQL读写分离之MyCAT

一、读写分离概述 1、什么是读写分离&#xff1a; 读写分离&#xff1a;就是将读写操作分发到不同的服务器&#xff0c;读操作分发到对应的服务器 &#xff08;slave&#xff09;&#xff0c;写操作分发到对应的服务器&#xff08;master&#xff09; ① M-S (主从) 架构下&…

【测试篇】测试用例

文章目录 前言具体设计测试用例等价类边界值场景设计法判定表&#xff08;因果图&#xff09;正交排列&#xff08;用的非常少&#xff09;错误猜测法 前言 什么是测试用例&#xff1f;&#xff1f; 测试用例是针对软件系统或应用程序的特定功能或场景编写的一组步骤&#xf…

记一次Cannot deploy POJO class [xxx$$EnhancerBySpringCGLIB$$xxx]的错误

最近项目上需要使用websocket做服务端&#xff0c;那好说啊&#xff0c;直接springboot集成的websocket 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><versi…

开源推荐榜【Pear Admin Flask 用python来创建后台管理系统】

最新技术高效快速开发&#xff0c;前后端分离模式&#xff0c;开箱即用。 核心模块包括&#xff1a;用户、角色、职位、组织机构、菜单、字典、日志、多应用管理、文件管理、定时任务等功能。 代码量少、学习简单、功能强大、轻量级、易扩展&#xff0c;轻松开发从现在开始&…

【OpenCV】图像像素的遍历

1 前言 介绍两种遍历像素的方法&#xff08;非指针、指针&#xff09;。注意&#xff1a;.at() .ptr()的作用、用法。相关API&#xff1a; Mat对象.ptr() Mat对象.at() 2 代码及内容 #include "iostream" #include "opencv2/opencv.hpp"using namespac…

聊聊测试用例评审流程

测试人员将需求熟悉完成后&#xff0c;开始编写相应的测试用例&#xff0c;待测试用例编写完成后只是测试用例完成前的第一步&#xff0c;后边的流程需要组织线上或线下评审会议等等。 首先要了解测试用例评审的最终目的是什么&#xff1a;提高测试用例的质量和覆盖率&#xff…

notification+Android笔记

notification通知应用UI之外的消息并显示即推送&#xff1b; NotificationManager负责管理通知&#xff0c;例如显示取消&#xff0c;删除等&#xff1b; import android.app.Notification; import android.app.NotificationChannel; import android.app.NotificationManager;…

从0开始搭建基于VUE的前端项目(三) Vuex的使用与配置

准备与版本 vuex 3.6.2(https://v3.vuex.vuejs.org/zh/)概念 vuex是什么? 是用作 【状态管理】的 流程图如下 state 数据状态,成员是个对象 mapState 组件使用this.$store.state.xxx获取state里面的数据 getters 成员是个函数,方便获取state里面的数据,也可以加工数据 ma…

HackTheBox-Mist

整体思路 端口扫描->Pluck CMS组件文件读取漏洞->文件上传获取shell->创建指向exe的快捷方式来提权-> 信息收集&端口利用 namp -sSVC 10.10.11.17目标只开放了80端口&#xff0c;将mist.htb加入到hosts文件后&#xff0c;访问mist.htb Pluck CMS文件读取 在…

RH850P1X芯片学习笔记-Clocked Serial Interface H (CSIH)

文章目录 Features of RH850/P1x-C CSIHUnitsRegister Base AddressClock SupplyInterrupt RequestsHardware ResetExternal Input/Output Signals数据一致性检查 OverviewFunctional OverviewFunctional Overview DescriptionBlock Diagram RegistersList of RegistersCSIHnCT…

Leetcode - 127双周赛

目录 一&#xff0c;3095. 或值至少 K 的最短子数组 I 二&#xff0c;3096. 得到更多分数的最少关卡数目 三&#xff0c;3097. 或值至少为 K 的最短子数组 II 四&#xff0c;3098. 求出所有子序列的能量和 一&#xff0c;3095. 或值至少 K 的最短子数组 I 本题需要知道一个知…

武汉星起航电子商务有限公司挂牌展示,为合作伙伴提供全方位支持

随着跨境电商领域市场竞争愈演愈烈&#xff0c;武汉星起航亚马逊一站式孵化平台悄然崭露头角。从2017年起&#xff0c;武汉星起航便立足亚马逊自营店铺&#xff0c;积累了丰富的实战运营经验。2020年正式成立后&#xff0c;公司以跨境电商为核心&#xff0c;凭借专业的运营团队…

复杂度的讲解

1.算法效率 如何衡量一个算法的好坏&#xff1f;从两个维度&#xff0c;时间和空间&#xff08;算法运行的快慢&#xff0c;消耗的空间大不大&#xff09;。因为计算机硬件领域的高速发展&#xff0c;如今计算机的存储量已经达到了一个很高的程度&#xff0c;所以现在我们一般…

12种常见的软件架构风格

什么是软件架构&#xff1f; 软件架构是定义软件系统的高级结构和组织的过程。它涉及识别和选择正确的组件&#xff0c;决定它们之间如何交互&#xff0c;以及确定它们应该如何组织以实现特定的目标。软件架构的目标是创建一个可维护、可扩展和安全的系统&#xff0c;能够满足…