深入理解 Go 语言 Goroutine 的工作原理

news2024/9/24 9:17:55

一、设计思路

1、设计描述
  • 启动服务之时先初始化一个 Goroutine Pool 池,这个 Pool 维护了一个类似栈的 LIFO 队列,里面存放负责处理任务的 Worker
  • 然后在 client 端提交 task 到 Pool 中之后,在 Pool 内部,接收 task 之后的核心操作是
    • 检查当前 Worker 队列中是否有可用的 Worker,如果有,取出执行当前的 task
    • 没有可用的 Worker,判断当前在运行的 Worker 是否已超过该 Pool 的容量
    • 每个 Worker 执行完任务之后,放回 Pool 的队列中等待

2、Pool struct 
type sig struct{}

type f func() error

// pool从客户端获取任务,它限制 goroutines 总数,并且回收再使用
type Pool struct {
	capacity int32                   // 协程池容量
	running int32                    // 正在运行的goroutine数量
	expiryDuration time.Duration     // 为每个worker设置一个过期时间
	workers []*Worker                // 存放空闲 worker,请求进入 Pool先检查workers若有则取出绑定任务执行
	release chan sig                 // 当关闭该 Pool 支持通知所有 worker 退出运行以防 goroutine 泄露
	lock sync.Mutex                  // 同步操作锁
	once sync.Once                   // 确保 Pool 关闭操作只会执行一次
}
3、初始化 Pool 并启动定期清理过期 worker 任务
//新建一个线程池实例
func NewPool(size int) (*Pool,error) {
    return NewTimingPool(size,DefaultCleanIntervalTime)
}

//产生一个带有自定义定时器的线程池实例
func NewTimingPool(size, expiry int) (*Pool,error) {
    if size <= 0{
        return nil,ErrInvalidPoolSize
    }
    if expiry <= {
        return nil,ErrInvalidPoolExpiry
    }

   p := &Pool{
		capacity:       int32(size),
		freeSignal:     make(chan sig, math.MaxInt32),
		release:        make(chan sig, 1),
		expiryDuration: time.Duration(expiry) * time.Second,
	}
	// 启动定期清理过期worker任务,独立goroutine运行,进一步节省系统资源
	p.monitorAndClear()
	return p, nil
}
4、提交任务到 Pool
  • 第一个 if 判断当前 Pool 是否已被关闭,若是则不再接受新任务,否则获取一个 Pool 中可用的 worker,绑定该 task 执行
func (p *Pool) Submit(task f) error {
	if len(p.release) > 0 {
		return ErrPoolClosed
	}
	w := p.getWorker()
	w.task <- task
	return nil
}
5、获取可用 worker(核心)
  • p.getWorker()源码
//返回一个可用的 worker 来运行这些任务
func (p *Pool) getWorker() *Worker {
    var w *Worker
    waiting :=false    //标志变量,判断当前正在运行的 worker 数量是否已到达 Pool 的容量上限
    p.lock。Lock()    //加锁,检测队列中是否有可用 worker,并进行相应操作
 idleWorkers := p.workers
	n := len(idleWorkers) - 1
	if n < 0 {	 // 当前队列中无可用worker
		// 判断运行worker数目已达到该Pool的容量上限,置等待标志
		waiting = p.Running() >= p.Cap()
	} else {		// 当前队列有可用worker,从队列尾部取出一个使用
		w = idleWorkers[n]
		idleWorkers[n] = nil
		p.workers = idleWorkers[:n]
	}

	p.lock.Unlock()	  // 检测完成,解锁
	if waiting {	 // Pool容量已满,新请求等待
		for {		 // 利用锁阻塞等待直到有空闲worker
			p.lock.Lock()
			idleWorkers = p.workers
			l := len(idleWorkers) - 1
			if l < 0 {
				p.lock.Unlock()
				continue
			}
			w = idleWorkers[l]
			idleWorkers[l] = nil
			p.workers = idleWorkers[:l]
			p.lock.Unlock()
			break
		}
		// 当前无空闲worker但是Pool还没有满,则可以直接新开一个worker执行任务
	} else if w == nil {
		w = &Worker{
			pool: p,
			task: make(chan f, 1),
		}
		w.run()
		// 运行worker数加一
		p.incRunning()
	}
	return w
}
6、执行任务
  • 结合前面的 p.Submit(task f) 和 p.getWorker() ,提交任务到 Pool 之后,获取一个可用 worker
  • 每新建一个 worker 实例之时都需要调用 w.run() 启动一个 goroutine 监听 worker 的任务列表 task ,一有任务提交进来就执行
  • 所以,当调用 worker 的 sendTask(task f) 方法提交任务到 worker 的任务队列之后,马上就可以被接收并执行
  • 当任务执行完之后,会调用 w.pool.putWorker(w *Worker) 方法将这个已经执行完任务的 worker 从当前任务解绑放回 Pool 中,以供下个任务可以使用
  • 至此,一个任务从提交到完成的过程就此结束,Pool 调度将进入下一个循环。
// Worker是运行任务的实际执行者,它启动一个接受任务并执行函数调用的goroutine
type Worker struct {
	pool *Pool   // 每个pool对应一个worker
	task chan f  // 任务是一项应该完成的工作
	recycleTime time.Time	 // 当将一个worker放回队列时,recycleTime将被更新。
}

// Run启动一个goroutine以重复执行函数调用的过程
func (w *Worker) run() {
	go func() {
		// 循环监听任务列表,一旦有任务立马取出运行
		for f := range w.task {
			if f == nil {
				// 退出goroutine,运行worker数减一
				w.pool.decRunning()
				return
			}
			f()
			// worker回收复用
			w.pool.putWorker(w)
		}
	}()
}
7、worker回收(goroutine 复用)
// putWorker将一个worker放回空闲池,回收goroutines
func (p *Pool) putWorker(worker *Worker) {
	// 写入回收时间,亦即该worker的最后一次结束运行的时间
	worker.recycleTime = time.Now()
	p.lock.Lock()
	p.workers = append(p.workers, worker)
	p.lock.Unlock()
}
8、动态扩容或者缩小池容量
// ReSize更改此池的容量
func (p *Pool) ReSize(size int) {
	if size == p.Cap() {
		return
	}
	atomic.StoreInt32(&p.capacity, int32(size))
	diff := p.Running() - size
	if diff > 0 {
		for i := 0; i < diff; i++ {
			p.getWorker().task <- nil
		}
	}
}
9、定期清理过期 Worker
  • 定期检查空闲 worker 队列中是否有已过期的 worker 并清理
  • 因为采用了 LIFO 后进先出队列存放空闲 worker,所以该队列默认已经是按照 worker 的最后运行时间由远及近排序
  • 可以方便地按顺序取出空闲队列中的每个 worker 并判断它们的最后运行时间与当前时间之差是否超过设置的过期时长
  • 若是,则清理掉该 goroutine,释放该 worker,并且将剩下的未过期 worker 重新分配到当前 Pool 的空闲 worker 队列中,进一步节省系统资源
//  定期清理过期 Worker
func (p *Pool) periodicallyPurge() {
	heartbeat := time.NewTicker(p.expiryDuration)
	for range heartbeat.C {
		currentTime := time.Now()
		p.lock.Lock()
		idleWorkers := p.workers
		if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 {
			p.lock.Unlock()
			return
		}
		n := 0
		for i, w := range idleWorkers {
			if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
				break
			}
			n = i
			w.task <- nil
			idleWorkers[i] = nil
		}
		n++
		if n >= len(idleWorkers) {
			p.workers = idleWorkers[:0]
		} else {
			p.workers = idleWorkers[n:]
		}
		p.lock.Unlock()
	}
}

二、pool 使用

1、公共池
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/panjf2000/ants/v2"
)

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}

func main() {
	// 在retrieveWorker()中可能有一些调用者在等待,因此我们需要唤醒它们来防止那些无限阻塞的调用者
	defer ants.Release()
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", ants.Running())
	fmt.Printf("finish all tasks.\n")
}

/*
	Hello World!
	Hello World!
	running goroutines: 1000
	finish all tasks.
 */
2、方法绑定池
package main

import (
	"fmt"
	"github.com/panjf2000/ants/v2"
	"sync"
)

func myFunc(i interface{}) {
	fmt.Printf("run with %d\n", i)
}

func main() {
	defer ants.Release()
	var wg sync.WaitGroup
	// 使用池和函数,设置goroutine pool的容量为10,超时时间为1秒。
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// 逐个提交任务
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", p.Running())
}

/*
run with 976
run with 990
run with 971
running goroutines: 10
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1310986.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue引入字节跳动图标库

复制下面的命令安装IconPark库&#xff1a; npm install icon-park/vue-next --save 如果你不在乎打包大小&#xff0c;可以全局注册IconPark并使用。在你的main.js中配置如下代码&#xff1a; import { createApp } from "vue";import App from "./App.vue&qu…

基于SSM的OA办公系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

计算两股不同流量的气体,通过换热器换热后,高温气体的出口温度

# -*- coding: utf-8 -*- """ Created on Thu Nov 30 11:23:12 2023 计算两股不同流量的气体&#xff0c;通过换热器换热后&#xff0c;高温气体的出口温度 (煤烟二级&#xff0c;计算煤烟二级热侧出口温度) ------------------------------------------------ …

云原生架构总结-读书笔记

云原生架构进阶实战-读书笔记 云原生概念 云原生&#xff08;Cloud Native&#xff09;概念是由Pivotal的Matt Stine在2013年首次提出的。这个概念得到了社区的不断完善&#xff0c;内容越来越丰富&#xff0c;目前已经**包括了DevOps&#xff08;Development和Operations的组…

ROS2 Control分析讲解

ROS2 Control 文章目录 前言简述组成安装 框架Controller ManagerResource ManagerControllersUser Interfaces Hardware ComponentsURDF中的硬件描述机器人运行框架 总结 前言 ros2_control是一个使用&#xff08;ROS 2&#xff09;进行机器人&#xff08;实时&#xff09;控…

【亚马逊云科技】通过高性能低延迟对象存储 S3实现网站资源托管

本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 文章目录 前言1 S3 介绍1.1 优点 2 使用步骤2.1 注册账户2.2 创建存储桶2.2.1 打开控制…

WEB服务器介绍

Web服务器是指驻留于因特网上某种类型计算机的程序。当Web浏览器连到服务器上并请求文件时&#xff0c;服务器将处理该请求并将文件发送到该浏览器上&#xff0c;附带的信息会告诉浏览器如何查看该文件&#xff0c;即文WEB服务器件类型。服务器使用HTTP进行信息交流&#xff0c…

springboot获取配置文件属性值

Value&#xff1a; 作用&#xff1a;属性注入&#xff0c;需要每个值上进行书写变量名 ConfigurationProperties 指定外部属性文件。在类上添加&#xff0c;常与ConfigurationProperties 配合使用

RK3568平台 OTA升级原理

一.前言 在迅速变化和发展的物联网市场&#xff0c;新的产品需求不断涌现&#xff0c;因此对于智能硬件设备的更新需求就变得空前高涨&#xff0c;设备不再像传统设备一样一经出售就不再变更。为了快速响应市场需求&#xff0c;一个技术变得极为重要&#xff0c;即OTA空中下载…

Python占位符%详解:格式化字符串的利器

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在Python中&#xff0c;%占位符是一种强大的工具&#xff0c;用于格式化字符串。本文将深入解析Python中占位符的使用方法&#xff0c;包括字符串格式化、数字格式化、日期格式化等多个方面。通过丰富的示例代码…

FolkMQ 国产消息中间件,v1.0.21 发布

简介 采用 “多路复用” “内存运行” “快照持久化” “Broker 集群模式”&#xff08;可选&#xff09;基于 Socket.D 网络应用协议 开发。全新设计&#xff0c;自主架构&#xff01; 角色功能生产端发布消息&#xff08;Qos0、Qos1&#xff09;、发布定时消息&#xff…

【多线程】线程池详解

文章目录 什么是线程池生活案例理解线程池为什么使用线程池线程池的优点 自定义线程池内置线程池总结 什么是线程池 线程池其实就是一种多线程处理形式&#xff0c;处理过程中可以将任务添加到队列中&#xff0c;然后在创建线程后自动启动这些任务。这里的线程就是我们前面学过…

【经验分享】gemini-pro和gemini-pro-vision使用体验

Gemini Gemini已经对开发者开放了Gemini Pro的使用权限&#xff0c;目前对大家都是免费的&#xff0c;每分钟限制60条&#xff0c;至少这比起CloseAI的每个账户5刀限速1min3条要香的多&#xff0c;目前已于第一时间进行了体验 一句话总结&#xff0c;google很大方&#xff0c;但…

影视动画行业发展现状与方向:AI技术推动动画工业化体系新变革

工业化体系 是国产动画强国的必经之路 中国动画的百年历程不仅是创作者们展现艺术才华的历程&#xff0c;也是一代代中国动画人不懈追求动画工业体系建设的历程。 为什么现在的中国动画需要建立工业化体系呢&#xff1f; 举个例子&#xff0c;在建立工业化体系之前&#xff…

Colorful Grid Codeforces Round 910 (Div. 2) C

Problem - C - Codeforces 题目大意&#xff1a;有一个n*m的网格&#xff0c;要求从(1,1)走到(n,m)&#xff0c;同时要求路径的长度必须为k1&#xff0c;然后给每个两点之间的路径染成红色或蓝色&#xff0c;要求任意两个相邻线段颜色不能相同&#xff0c;求涂色的方案 3<…

MSF学习

之前的渗透测试中 其实很少用到 cs msf 但是在实际内网的时候 可以发现 msf cs 都是很好用的 所以现在我来学习一下 msf的使用方法 kali自带msf https://www.cnblogs.com/bmjoker/p/10051014.html 使用 msfconsole 启动即可 首先就是最正常的木马生成 所以这里其实只需…

Selenium三大等待(详解版)

一、强制等待 1.设置完等待后不管有没有找到元素&#xff0c;都会执行等待&#xff0c;等待结束后才会执行下一步 2.实例&#xff1a; driver webdriver.Chrome()driver.get("https://www.baidu.com")time.sleep(3) # 设置强制等待driver.quit() 二、隐性等待 …

【Spring】04 国际化

文章目录 1. 定义2. Spring 的支持1&#xff09; MessageSource接口2&#xff09; ResourceBundleMessageSource 3. 配置国际化1&#xff09;配置MessageSource Bean2&#xff09;创建资源文件3&#xff09;在Bean中使用国际化消息 4. 使用占位符和参数结语 Spring 为我们提供了…

深入理解C语言的函数参数

1、一个简单的函数 int Add(int x, int y) {return x y; }int main() {printf("%d", Add(2, 3, 4, 5, 6));return 0; } 这一段足够简单的代码&#xff0c;闭眼都能知道运行结果会在屏幕上打印 5 。那编译器是怎么处理后面的 4、5、6 &#xff1f; 我们再看看这个函…

【系统架构】集群、分布式概念及系统架构演进过程

集群、分布式概念&#xff1a; 对食物没有太高要求的人在肚子饿的时候一般都会选择去兰州拉面、沙县小吃等小饭馆&#xff0c;这类小饭馆有个很显著的特点&#xff1a;洗菜、切菜、炒菜都是同一个人完成&#xff0c;如果厨子不舒服可能饭馆还会歇业。而一些人流量较大的饭馆的分…