Go 异步任务

news2025/1/10 3:15:36

Go 异步任务

异步任务在开发中很常见,用来做解耦。本文介绍一下异步队列的实现的几个问题,并且结合三方库实现来分析。

有下面的几个关键点:

  1. 用户代码(任务)如何封装
  2. 数据的存放(数据存放在哪里?就是一个读取队列)
  3. worker的管理(worker的数量,worker执行是否支持超时时间,worker的异常恢复)

带着上面的问题,对比https://github.com/golang-queue/queue的实现,说明一下。

用户代码如何封装

对于任务来说,最重要的是 函数操作,也就是对应的代码逻辑。go中是可以将方法作为参数传递的,方法也是一种类型。所以我们定义下面的方法,方法签名如下:

type TaskFunc func(ctx context.Context) error

还可以配置方法的callback逻辑,比如重试次数,重试间隔,重试error的判断等等

抽象出一个结构体来表示

https://github.com/golang-queue/queue/blob/master/job/job.go#L15
在这里插入图片描述

数据的存放

这是很好拓展的地方,可以支持多种存储媒介和中间件,比如基于内存实现的循环队列,redis,rocketmq。

在实现上就是接口抽象功能,依赖倒转。接口有下面的两个功能

  1. 存数据
  2. 取数据

https://github.com/golang-queue/queue/blob/master/core/worker.go
在这里插入图片描述

解释一下QueuedMessage接口和Worker中的Run方法

  1. QueuedMessage

    用来做数据转换的。

  2. Run

    用来执行函数,表示执行的任务。

worker的管理

worker管理涉及到下面几个方面

  1. worker的数量限制
  2. worker执行时候的超时时间
  3. worker执行时候的异常panic
  4. workder从队列中获取需要处理的处理,并且支持请求超时操作
  5. 服务关闭之后worker也需要操作

我们来看golang-queue/queue中的实现是什么?

通过metric来记录queue在运行期间具体的情况

https://github.com/golang-queue/queue/blob/master/metric.go#L20

在这里插入图片描述

并且通过 channel 来做限制。

每次在goroutine启动和停止的时候通过metric来计数。并且会调用schedule来发信号,给ready发送信号。

goroutine在启动的时候会select ready。

在这里插入图片描述

work的异常情况,在调用task的处理函数的时候,肯定要用到defer来做error恢复,并且通过channel来通信,context来实现超时控制。

具体的原理,我们从下面的代码开始来分析。

https://github.com/golang-queue/queue/blob/master/queue.go#L285

// 对于start来说,是一个死循环,会启动一个goroutine从work中获取数据,当前goroutine等待结果,并且启动goroutine来执行,此Goroutine叫做worker。
func (q *Queue) start() {
	// QueuedMessage 表示message
	tasks := make(chan core.QueuedMessage, 1)
	// 启动一个goroutine来处理任务
	// 从work中获取任务,并且启动一个goroutine来处理任务
	for {
		// check worker number
    // 做调度的,就是检查work的数量
		q.schedule()
		
    // 数量不够,需要堵塞
		select {
		// wait worker ready
		case <-q.ready:
		case <-q.quit:
			return
		}

	// 启动一个goRoutine从 work中获取数据
		q.routineGroup.Run(func() {
			for {
				// 从队列中获取一个请求
				t, err := q.worker.Request()
				// 没有消息,或者有错误
				if t == nil || err != nil {
					// 有错误
					if err != nil {
						select {
              // 队列退出,关闭掉task,
						case <-q.quit:
							if !errors.Is(err, ErrNoTaskInQueue) {
								close(tasks)
								return
							}
              // 等待一秒再次从work中抓取新数据
						case <-time.After(time.Second):
							// sleep 1 second to fetch new task
						}
					}
				}
				if t != nil { // 说明取到了消息
					tasks <- t
					return
				}
				// 说明t为nil但是没有错误
				select {
				case <-q.quit:
					if !errors.Is(err, ErrNoTaskInQueue) {
						close(tasks)
						return
					}
				default:
				}
			}
		})
		// 这就是从queue中获取一个task,之后将此task提交给work来实现
		task, ok := <-tasks
		if !ok {
			return
		}
		// 所以,这里并没有维护所谓的goroutine池,因为go的编程是不需要这些玩意的。goroutine已经很轻量级的了,直接提交运行就好了
		// start new task
		q.metric.IncBusyWorker()
		q.routineGroup.Run(func() {
			q.work(task)
		})
	}
}

func (q *Queue) work(task core.QueuedMessage) {
	var err error
	// 来处理一些内部的错误,在这里会减去worker的数量,并且重新schedule
	defer func() {
		q.metric.DecBusyWorker()
		e := recover()
		if e != nil {
			q.logger.Errorf("panic error: %v", e)
		}
		q.schedule()

		// increase success or failure number
		if err == nil && e == nil {
			q.metric.IncSuccessTask()
		} else {
			q.metric.IncFailureTask()
		}
	}()
	// 运行任务,可以看到这里的代码就是为了包装一下
	if err = q.run(task); err != nil {
		q.logger.Errorf("runtime error: %s", err.Error())
	}
}

func (q *Queue) run(task core.QueuedMessage) error {
	data := task.(*job.Message)
	if data.Task == nil {
		data = job.Decode(task.Bytes())
		data.Data = data.Payload
	}

	return q.handle(data)
}

func (q *Queue) handle(m *job.Message) error {
	// create channel with buffer size 1 to avoid goroutine leak
	// 这是go中很创建的做法,一个channel中有数据,但并没有被其他的任何的goroutine操作的话,也是会被gc掉的
	done := make(chan error, 1) // 完成的信号channel
	panicChan := make(chan interface{}, 1) // panic的channel
	startTime := time.Now() 
	ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)
	defer func() {
		cancel()
	}()

	// run the job 启动goroutine来运行一个job
	go func() {
		// handle panic issue
		defer func() {
			if p := recover(); p != nil {
				panicChan <- p
			}
		}()

		// run custom process function
		var err error
		// 做重试逻辑,这里的重试逻辑还可以指定重试的错误,比如as,基于那种类型的错误来做重试操作等。
		b := &backoff.Backoff{
			Min:    m.RetryMin,
			Max:    m.RetryMax,
			Factor: m.RetryFactor,
			Jitter: m.Jitter,
		}
		delay := m.RetryDelay
   // backoff都是通过for循环来做的
	loop:
		for {
      // 两种形式,一种是直接function,一直是通过message
			if m.Task != nil {
				err = m.Task(ctx)
			} else {
				err = q.worker.Run(ctx, m)
			}

 	    // 不需要重试就直接返回,如果有错误就开始重试,并且利用time来做重试时间的控制
			if err == nil || m.RetryCount == 0 {
				break
			}
			m.RetryCount--

			if m.RetryDelay == 0 {
				delay = b.Duration()
			}
			// 这里用select来做操作
			select {
			case <-time.After(delay): // retry delay
				q.logger.Infof("retry remaining times: %d, delay time: %s", m.RetryCount, delay)
			case <-ctx.Done(): // timeout reached // ctx完成就直接返回
				err = ctx.Err()
				break loop
			}
		}

		done <- err
	}()
	// 当前的goroutine在等待结果,
	select {
	case p := <-panicChan:
		panic(p)
	case <-ctx.Done(): // timeout reached
		return ctx.Err()
	case <-q.quit: // shutdown service
		// cancel job
		cancel()

		leftTime := m.Timeout - time.Since(startTime)
		// wait job
		select {
		case <-time.After(leftTime):
			return context.DeadlineExceeded
		case err := <-done: // job finish
			return err
		case p := <-panicChan:
			panic(p)
		}
	case err := <-done: // job finish
		return err
	}
}

有个问题,如何保证程序退出的时候这些work可以执行结束呢?利用waitGroup实现。

https://github.com/golang-queue/queue/blob/master/thread.go

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/851369.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java——基础语法(二)

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

【SpringBoot】| ORM 操作 MySQL(集成MyBatis)

目录 一&#xff1a;ORM 操作 MySQL 1. 创建 Spring Boot 项目 2. MapperScan 3. mapper文件和java代码分开管理 4. 事务支持 一&#xff1a;ORM 操作 MySQL 使用MyBatis框架操作数据&#xff0c; 在SpringBoot框架集成MyBatis&#xff0c;使用步骤&#xff1a; &#x…

恒压恒流模式的工作原理及直流电源的应用

直流电源的两种基本工作模式为恒压、恒流&#xff0c;恒压输出时&#xff0c;电压恒定&#xff0c;随着负载发生变动&#xff0c;电流随之改变&#xff0c;当电流到达设定的阈值时&#xff0c;切换到恒流模式&#xff0c;维持目标电流为恒定值。 下面给出直流电源的伏安特性曲…

【数据结构与算法】十大经典排序算法-快速排序

&#x1f31f;个人博客&#xff1a;www.hellocode.top &#x1f3f0;Java知识导航&#xff1a;Java-Navigate &#x1f525;CSDN&#xff1a;HelloCode. &#x1f31e;知乎&#xff1a;HelloCode &#x1f334;掘金&#xff1a;HelloCode ⚡如有问题&#xff0c;欢迎指正&#…

Spring中的循环依赖问题

文章目录 前言一、什么是循环依赖&#xff1f;二、三级缓存三、图解三级缓存总结 前言 本文章将讲解Spring循环依赖的问题 一、什么是循环依赖&#xff1f; 一个或多个对象之间存在直接或间接的依赖关系&#xff0c;这种依赖关系构成一个环形调用&#xff0c;有下面 3 种方式…

根据数组中各值是否满足指定条件决定是否将其按指定规则计算更新numpy.putmask()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 根据数组中各值是否满足指定条件 决定是否将其按指定规则计算更新 numpy.putmask() 选择题 以下程序的运行结果是? import numpy as np xnp.array([1,2,3,4,5]) print("【显示】x:\n&quo…

【Spring】如果你需要使用重试机制,请使用Spring官方的Spring Retry

文章目录 前言Spring Retry的基本使用第一步&#xff0c;引入Spring Retry的jar包第二步&#xff0c;构建一个RetryTemplate类第三步&#xff0c;使用RETRY_TEMPLATE注意事项 拓展方法降级操作重试策略&#xff1a;时间策略重试策略&#xff1a;指定异常策略 前言 Spring Retr…

Vue3 第五节 一些组合式API和其他改变

1.provide和inject 2.响应式数据判断 3.Composition API的优势 4.新的组件 5.其他改变 一.provide和inject 作用&#xff1a;实现祖与后代组件间通信 套路&#xff1a;父组件有一个provide选项来提供数据&#xff0c;后代组件有一个inject选项来开始使用这些数据 &…

centos 7镜像(iso)下载图文教程(超详细)

声明&#xff1a;本教程为本人学习笔记&#xff0c;仅供参考 文章目录 前言一、阿里云镜像站下载centos 7 二、清华源下载centos 7小结 前言 声明&#xff1a;本教程为本人学习笔记&#xff0c;仅供参考 本教程将提供两种方式下载centos 7 系统镜像 1、阿里巴巴开源镜像站 2、…

Unity之ShaderGraph 节点介绍 Procedural节点

程序化 噪声Gradient Noise&#xff08;渐变或柏林噪声&#xff09;Simple Noise&#xff08;简单噪声&#xff09;Voronoi&#xff08;Voronoi 噪声&#xff09; 形状Ellipse&#xff08;椭圆形&#xff09;Polygon&#xff08;正多边形&#xff09;Rectangle&#xff08;矩形…

VM虚拟机和主机互相ping不通,但是ssh能连上,也能访问外网

直接还原默认设置&#xff0c;然后点确定 注意&#xff0c;你还原设置以后ip也会变&#xff0c;ifconfig自己重新看一下

大数据-玩转数据-Sink到Kafka

一、添加Kafka Connector依赖 pom.xml 中添加 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version></dependency&g…

IMV4.0

背景内容&#xff1a; 经历了多个版本&#xff0c;基础内容在前面&#xff0c;可以使用之前的基础环境&#xff1a; v1&#xff1a; https://blog.csdn.net/wtt234/article/details/132139454 v2&#xff1a; https://blog.csdn.net/wtt234/article/details/132144907 v3&#…

Docker mysql+nacos单机部署

docker 网络创建 由于nacos需要访问mysql的数据&#xff0c;因此mysql容器和nacos容器之间需要进行通信。容器间通信有很多方式&#xff0c;在这里采用同一网络下的方式进行实现。因此需要创建网络。创建网络的命令如下&#xff1a; docker network create --driver bridge n…

致远OA任意管理员登录

真的&#xff0c;如果痛苦不能改变生存&#xff0c;那还不如平静地将自己毁灭。毁灭。一切都毁灭了&#xff0c;只有生命还在苟延残喘。这样的生命还有什么存在的价值&#xff1f; 漏洞复现 访问漏洞url 构造payload POST /seeyon/thirdpartyController.do HTTP/1.1methoda…

【数据结构与算法】平衡二叉树(AVL树)

平衡二叉树&#xff08;AVL树&#xff09; 给你一个数列{1,2,3,4,5,6}&#xff0c;要求创建二叉排序树&#xff08;BST&#xff09;&#xff0c;并分析问题所在。 BST 存在的问题分析&#xff1a; 左子树全部为空&#xff0c;从形式上看&#xff0c;更像一个单链表。插入速度…

pg实现月累计

获取每月累计数据&#xff1a; ​​​ SELECT a.month, SUM(b.total) AS total FROM ( SELECT month, SUM(sum) AS total FROM ( SELECT to_char(date("Joinin"),YYYY-MM) AS month , COUNT(*) AS sum FROM "APP_HR_Staff_Basic_Info" GROUP BY month ) …

Vue中使用uuid生成唯一ID(脚手架创建自带的)

1.utils 说明&#xff1a;一般封装工具函数。 // 单例模式 import { v4 as uuidv4 } from uuid; // 要生成一个随机的字符串&#xff0c;且每次执行不能发生变化 // 游客身份还要持久存储 function getUUID(){// 先从本地获取uuid&#xff0c;本地存储里面是否有let uuid_tok…

淘宝整店商品如何批量获取?获取淘宝店铺所有商品接口item_search_shop

在竞争日益激烈的电商行业&#xff0c;不少商家出于以下的考虑&#xff0c;想要实现一键批量获取淘宝店铺的所有商品。 竞争分析&#xff1a;通过获取某个店铺内的所有商品信息&#xff0c;可以对竞争对手的产品进行全面的了解和分析。可以了解到对手的产品种类、价格、销量等情…

git和github学习

一、什么是git和github? 二、学会使用github desktop应用程序 初始使用&#xff1a; 一开始我们是新账户&#xff0c;里面是没有仓库的&#xff0c;需要手动创建一个仓库。此时&#xff0c;这个仓库是创建在本地仓库里面&#xff0c;需要用到push命令&#xff08;就是那个pub…