go-resiliency源码解析之-batcher

news2025/1/12 10:52:38

go-resiliency源码解析之-batcher

源代码地址 : https://github.com/eapache/go-resiliency/blob/master/batcher/batcher.go

1.batcher定义

创建一个batch对象需要2个参数:

Timeout:超时,这是一个batch对象收集输入参数的时间。

work函数变量:在timeout超时后,会调用一次work函数,来处理每一个输入参数。

整体处理流程如下图:

请添加图片描述

2.核心源码解析

核心结构定义

type work struct {
  //收集的一个参数
	param  interface{}
	//参数处理返回
	future chan error
}


type Batcher struct {
  //收集参数的超时时间
	timeout   time.Duration
	//过滤器函数
	prefilter func(interface{}) error
  //互斥量,用于参数收集并发控制
	lock   sync.Mutex
	//存储收集到参数的chan
	submit chan *work
	//批处理函数,超时后,调用该函数一次,处理全部参数
	//[]interface{}
	doWork func([]interface{}) error
	done   chan bool
}

Run函数

//param是timeout内可收集参数,业务方调用Run函数传入参数
func (b *Batcher) Run(param interface{}) error {
  //先判断是否有过滤器函数。 prefilter相当于一个数据清洗函数,对无效param参数返回err,这样
  //在dowork里就不会处理这个输入参数
	if b.prefilter != nil {
		if err := b.prefilter(param); err != nil {
			return err
		}
	}

  //timeout==0表示无收集参数时间,需要立刻执行doWork函数
	if b.timeout == 0 {
		return b.doWork([]interface{}{param})
	}

  //当timeout > 0 ,就构造一个work对象放入到chan里
	w := &work{
		param:  param,
		future: make(chan error, 1),
	}

	b.submitWork(w)

	return <-w.future
}

func (b *Batcher) Prefilter(filter func(interface{}) error) {
	b.prefilter = filter
}

submitWork函数:在Run函数里,当timeout > 0会调用submitWork函数

func (b *Batcher) submitWork(w *work) {
	//这里为什么要加一个互斥锁?
	//对,主要是防止下面if里的代码被并发执行
	b.lock.Lock()
	defer b.lock.Unlock()
  
  //创建submit的chan, 开启一个batch协程
	if b.submit == nil {
		b.done = make(chan bool)
		b.submit = make(chan *work, 4)
		go b.batch()
	}

	b.submit <- w
}

func (b *Batcher) batch() {
	//params为收集参数集合
	var params []interface{}
	var futures []chan error
	input := b.submit

	go b.timer()

  //for读取input这个chan,input在没有close前,这个for不会退出
  //所以这里就是在等待timeout时间,把输入的参数收集到params这个切片
  //?? 那input chan什么时候被close了?? 就是	go b.timer()这一句
	for work := range input {
		params = append(params, work.param)
		futures = append(futures, work.future)
	}

  //这里就是把收集到的参数传入到你设置的函数,执行业务逻辑
	ret := b.doWork(params)

  //把doWork执行结果写回到future,这样调用线程就可以读取到执行结果
	for _, future := range futures {
		future <- ret
		close(future)
	}
	close(b.done)
}


func (b *Batcher) timer() {
	//阻塞协程timeout时间,然后调用flush函数
	time.Sleep(b.timeout)
 
  //主要就是关闭submit这个chan,让batch里收集参数for循环退出
	b.flush()
}

func (b *Batcher) flush() {
	b.lock.Lock()
	defer b.lock.Unlock()

	if b.submit == nil {
		return
	}

	close(b.submit)
	b.submit = nil
}

3.测试用例

这个测试用例实现,在1s内收集传入的整形,然后求和

func TestBatcher(t *testing.T) {
	wg := &sync.WaitGroup{}

	b := New(time.Second, func(params []interface{}) error {

		sum := 0
		for _, p := range params {
			sum += p.(int)
		}
		t.Logf("sum %d", sum)
		return nil
	})

	b.Prefilter(func(param interface{}) error {
		// do some sort of sanity check on the parameter, and return an error if it fails
		return nil
	})

	for i := 1; i <= 10; i++ {
		wg.Add(1)
		go func(param interface{}) {
			go b.Run(i)
			wg.Done()
		}(i)

	}

	wg.Wait()

	time.Sleep(5 * time.Second)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/498733.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Jumpserver+Frp集中管理远程内网服务器

我们设想这样一种情况&#xff0c;我们是一家小公司&#xff0c;在全国有几个小分支办公机构&#xff0c;每个机构有一台服务器&#xff0c;由于公司财务紧张&#xff0c;买不起专线&#xff0c;用的也是普通家用宽带&#xff0c;SD-WAN设备说实话也挺贵的&#xff0c;那么我们…

Docker部署MySQL主从复制架构

文章目录 1、准备工作2、下拉镜像&#xff0c;启动容器3、编辑 MySQL 主节点配置4、编辑 MySQL 从节点配置5、通过 Navicat 配置主节点6、通过 Navicat 配置从节点7、最终测试 1、准备工作 云服务器&#xff08;虚拟机也可以&#xff09;安装Docker&#xff0c;参考该文章第1小…

Telnet 【实验】

1 什么是Telnet&#xff1f; Telnet是进行远程登录的标准协议&#xff0c;它是当今Internet上应用最广泛的协议之一。它把用户正在使用的终端或计算机变成网络某一远程主机的仿真终端&#xff0c;使得用户可以方便地使用远程主机上的软、硬件资源。 为什么需要telnet&#xf…

python自动化程序操作浏览器在后台访问网站

目录 1、简介2、详细步骤3、驱动3.1、ChromeDriver3.2、GeckoDriver3.3、Microsoft WebDriver3.4、下载链接 4、完整代码 ⭐在当今信息化的时代&#xff0c;访问网站已经成为人们生活中必不可少的一部分。 1、简介 对于某些需要批量访问网站的任务&#xff0c;手动访问既费时又…

Java - Thread、ThreadLocal、ThreadLocalMap

一、概念 用处每个线程需要持有自己单独的数据&#xff0c;数据在同线程中被多个地方共享&#xff0c;但多线程中不共享。原理创建 ThreadLocal 对象用来存取值&#xff0c;操作的是 CurrentThread 里的 ThreadLocalMap&#xff0c;由于每个 Thread 中都有自己的 ThreadLocalM…

Codeforces Round 870 (Div. 2)

Codeforces Round 870 (Div. 2) A. Trust Nobody 题意&#xff1a; 给你一个数组a&#xff0c;a[i]表示第i个人认为至少有a[i]个人说谎&#xff0c;请你找到正确的说谎人数 思路&#xff1a; 我们认为说谎人数有x人&#xff0c;那么数组a中大于x的元素都是说谎&#xff0c;…

rosbag相关操作

一些很好用的网站 时间戳在线转换网页 旋转矩阵、四元数、绕轴旋转、欧拉角在线转换网页 四元数、欧拉角可视化在线转换网页 一、按时间截取bag 使用如下代码&#xff1a; rosbag filter 原始包名.bag 截取后的包名.bag "t.to_sec() > 开始时间 and t.to_sec() <…

机器学习随记(3)— Softmax

Softmax 与sigmoid不同&#xff0c;softmax可以多个输出&#xff0c;每个输出以概率的形式表示。 或者 def my_softmax(z):ez np.exp(z) sm ez/np.sum(ez)return(sm) 1 损失函数Loss Softmax 相关的损失函数&#xff0c;即交叉熵损失&#xff1a; 2 成本函数Cos…

Vben Admin 自学记录 —— 路由跳转的基本使用及练习(持续更新中...)

路由 项目路由配置存放于 src/router/routes 下面。 src/router/routes/modules用于存放路由模块&#xff0c;在该目录下的文件会自动注册。 路由相关使用及概念 练习 —— 在之前table基础上&#xff0c;添加新增功能&#xff0c;点击新增按钮&#xff0c;跳转页面&#xf…

电脑中病毒了怎么修复、怎么办,计算机windows系统如何预防faust勒索病毒

随着科技技术的不断发展&#xff0c;计算机已经成为现代人工作和生活中的必备工具之一&#xff0c;同时衍生出的各种计算机病毒也越来越多。各种勒索家族的加密病毒程序不断升级&#xff0c;给我们的工作和生活带来了极大困扰&#xff0c;特别是最近爆发的一种新型计算机病毒—…

如何优化 yolov8 模型,压缩模型大小,部署到边缘设备上

在CV领域&#xff0c;YOLO系列目标检测模型是一种非常流行的深度学习网络模型。yolov8是23年1月10号开源的最新版本。虽然YOLOv8已经在精度和速度方面取得了显著的改进&#xff0c;但我们仍然可以从轻量化角度等很多方面来进一步优化该模型。 模型压缩&#xff1a;使用轻量化的…

华为EC6108V9A_RK3128_安卓4.4.4_卡刷固件包-内有教程-当贝纯净桌面

华为EC6108V9A_RK3128_安卓4.4.4_卡刷固件包-内有教程-当贝纯净桌面 特点&#xff1a; 1、适用于对应型号的电视盒子刷机&#xff1b; 2、开放原厂固件屏蔽的市场安装和u盘安装apk&#xff1b; 3、修改dns&#xff0c;三网通用&#xff1b; 4、大量精简内置的没用的软件&a…

不用花一分钱!!!获得一个自己的网页版chatGPT

不用花一分钱&#xff01;&#xff01;&#xff01;获得一个自己的网页版chatGPT 当然还是需要一个chatGPT账号的&#xff0c;不会注册的同学可以看一下这篇文章 chatGPT到底要怎么注册 那就先让我们看一下效果吧 chatgpt-web介绍 github项目地址 https://github.com/Chanzha…

【源码解析】实现异步功能的注解 @Async 的源码解析

使用方式 启动类上添加注解EnableAsync()在方法或者类上添加Async 源码解析 初始化配置 EnableAsync注入了AsyncConfigurationSelector Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) Documented Import(AsyncConfigurationSelector.class) public interf…

【郭东白架构课 模块二:创造价值】23|节点四:架构规划之统一语义

你好&#xff0c;我是郭东白。从这节课开始&#xff0c;我们就进入到架构活动的第四个环节——架构规划。这个环节比较复杂&#xff0c;可以分为四个部分&#xff1a;统一语义、需求确认、边界划分和规划确认。这节课我们先来讲统一语义。 架构师的工作日常就是跟不同的角色沟…

Mysql索引(4):索引语法

1 创建索引 CREATE [ UNIQUE | FULLTEXT ] INDEX index_name ON table_name (index_col_name,... ) ; 2 查看索引 SHOW INDEX FROM table_name; 3 删除索引 DROP INDEX index_name ON table_name; 4 案例演示 先来创建一张表 tb_user&#xff0c;并且查询测试数据。 creat…

从零开始学习Linux运维,成为IT领域翘楚(九)

文章目录 &#x1f525;Linux系统服务&#x1f525;Linux系统定时任务 &#x1f525;Linux系统服务 Service命令 服务(service) 本质就是进程&#xff0c;但是是运行在后台的&#xff0c;通常都会监听某个端口&#xff0c;等待其它程序的请求&#xff0c;比如(mysql , sshd 防…

Vue加SpringBoot实现项目前后端分离

首先需要搭建一个Vue的脚手架项目&#xff08;已经放在gitee里面了&#xff0c;下面是gitee网址&#xff0c;可以直接拉&#xff09; 那么接下来就是实现前后端分离的步骤 首先我们需要有一个登录页面 登录的点击事件利用axios提交到后台去&#xff0c;代码放在后面&#xff08…

【C++修炼之路:二叉搜索树】

目录&#xff1a; 二叉搜索树的概念构建一颗二叉树二叉树的查找二插树的插入 二叉树的删除删除右子树的最小节点 写一个中序来走这个二叉搜索树递归版删除&#xff08;recursion&#xff09;递归版插入&#xff08;recursion&#xff09;递归版查找&#xff08;recursion&#…

基于AT89C51单片机的电子密码锁设计与仿真

点击链接获取Keil源码与Project Backups仿真图&#xff1a; https://download.csdn.net/download/qq_64505944/87760996?spm1001.2014.3001.5503 源码获取 主要内容&#xff1a; &#xff08;1&#xff09;本设计为了防止密码被窃取要求在输入密码时在LCD屏幕上显示*号。 &a…