使用增强版 singleflight 合并事件推送,效果炸裂!

news2025/1/2 3:09:48

hello,大家好啊,我是小楼。

最近在工作中对 Go 的 singleflight 包做了下增强,解决了一个性能问题,这里记录下,希望对你也有所帮助。

singleflight 是什么

singleflight 直接翻译为”单(次)飞(行)“,它是对同一种请求的抑制,保证同一时刻相同的请求只有一个在执行,且在它执行期间的相同请求都会 Hold 直到执行完成,这些 hold 的请求也使用这次执行的结果。

举个例子,当程序中有读(如 Redis、MySQL、Http、RPC等)请求,且并发非常高的情况,使用 singleflight 能得到比较好的效果,它限制了同一时刻只有一个请求在执行,也就是并发永远为1。

singleflight 的原理

最初 singleflight 出现在 groupcache 项目中,这个项目也是 Go 团队所写,后来该包被移到 Go 源码中,在 Go 源码中的版本经过几轮迭代,稍微有点复杂,我们以最原始的源码来讲解原理,更方便地看清本质。

https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go

singleflight 把每次请求定义为 call,每个 call 对象包含了一个 waitGroup,一个 val,即请求的返回值,一个 err,即请求返回的错误。

type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}

再定义全局的 Group,包含一个互斥锁 Mutex,一个 key 为 string,value 为 call 的 map。

type Group struct {
	mu sync.Mutex       
	m  map[string]*call
}

Group 对象有一个 Do 方法,其第一个参数是 string 类型的 key,这个 key 也就是上面说的 map 的 key,相同的 key 标志着他们是相同的请求,只有相同的请求会被抑制;第二个参数是一个函数 fn,这个函数是真正要执行的函数,例如调用 MySQL;返回值比较好理解,即最终调用的返回值和错误信息。

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	// ①
  g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
  // ②
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err
	}
  // ③
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	c.val, c.err = fn()
	c.wg.Done()

	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()

	return c.val, c.err
}

将整个代码分成三块:

  • ① 懒加载方式初始化 map;
  • ② 如果当前 key 存在,即相同请求正在调用中,就等它完成,完成后直接使用它的 value 和 error;
  • ③ 如果当前 key 不存在,即没有相同请求正在调用中,就创建一个 call 对象,并把它放进 map,接着执行 fn 函数,当函数执行完唤醒 waitGroup,并删除 map 相应的 key,返回 value 和 error。

读可以抑制,写呢?

我们通过上面的介绍能了解,singleflight 能解决并发读的问题,但我又遇到一个并发写的问题。为了能让大家快速进入状态,先花一点篇幅描述一下遇到的实际问题:

微服务中的注册中心想必大家都有所了解,如果不了解,可以去查查相关概念,或者翻看我以前的文章,老读者应该能发现我写了很多相关的文章。

服务提供方在注册之后,会将变更事件推送到消费方,推送事件的处理流程是:接收到事件,查询组装出最新的数据,然后推送给订阅者。存在两种情况可能会导致短时间内注册请求非常多,推送事件多会影响整个注册中心的性能:

  • 接口级注册(类似 Dubbo),每台机器会注册N多次
  • 服务并发发布,例如每次发布重启100台机器,那么注册的并发就可能是100

拿到这种问题,第一想到的解法是:合并推送。但,怎么合并呢?

是不是每次推送的时候等一等,等事件都来了再一把推过去就可以了?但等多久呢?什么时候该等呢?粗暴点,每秒钟推送一次,这样就能将一秒内的时间都聚合,但这会影响推送的时效性,显然不符合我们精益求精的要求。

直接使用 singleflight,能行吗?

套用上面 singleflight ,在第一个事件推送过程中,其他相同的事件被 Hold 住,等第一个事件推送完成后,这些 Hold 的事件不再执行推送直接返回。

稍微想一下就知道这样是有问题的,假设有三个事件 A、B、C,分别对应到三个版本的数据A1、B1、C1,A 最先到达,在 A 开始推送后但没完成时 B、C 事件到达,A 事件触发推送了 A1 版本的数据,B、C 事件在 A 事件推送完成后,直接丢弃,最终推送到消费者上的数据版本为 A1,但我们肯定期望推送的数据版本为 C1,画个图线感受下:

增强一点点 🤏🏻

假设有事件 A、B、C、D 先后到达,A 事件仍然先正常执行推送,在 A 事件推送的时候,B、C、D 事件 Hlod 住,当 A 事件推送完成后,B 事件开始推送,B 事件将把 A 事件推送时期积攒的事件都一起推送掉,即 B、C、D 一次性推送完成。

增强代码参考

增强的定义为 WriteGroup,借用 singleflight 原先的实现,具体代码就不必解读了,对照上面的例子应该很好理解。

package singleflight

import (
	"sync"
)

type WriteGroup struct {
	mu    sync.Mutex
	wgs   map[string]*sync.WaitGroup
	group Group
}

func (g *WriteGroup) Do(key string, fn func() error) error {
	g.mu.Lock()
	if g.wgs == nil {
		g.wgs = make(map[string]*sync.WaitGroup)
	}
	wg, ok := g.wgs[key]
	if !ok {
		wg = &sync.WaitGroup{}
		wg.Add(1)
		g.wgs[key] = wg
	}
	g.mu.Unlock()

	if !ok {
		err := fn()

		g.mu.Lock()
		wg.Done()
		delete(g.wgs, key)
		g.mu.Unlock()
		return err
	}

	wg.Wait()
	_, err := g.group.Do(key, func() (interface{}, error) {
		return nil, fn()
	})
	return err
}

效果如何?

理论上,如果没有并发,事件和以前一样推送,没有合并,当然这也没毛病。当并发大于 2 时,开始发挥威力。在实际的压测上,注册并发 1500 时,合并的事件达到 99.9%,效果相当炸裂!

最后感谢能抽空看到这里,如果你能点赞在看分享,我会更加感激不尽~


  • 搜索关注微信公众号"捉虫大师",后端技术分享,架构设计、性能优化、源码阅读、问题排查、踩坑实践
  • 进技术交流群加微信 MrRoshi

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/544594.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(4)STM32的SPI协议及LED点亮

目录 【1】SPI协议 SPI协议 2.SPI时序 【2】LCD液晶显示屏 显示屏 【3】点亮LCD显示屏 颜色填充 ​编辑 图片显示及英文显示 汉字显示 【1】SPI协议 SPI协议 SPI接口是摩托罗拉Motorola 首先提出的全双工三线/四线同步串行外围接口,采用主从模式(Master…

产业安全专家谈|如何为直播电商企业构建全面的风控防护?

微赞是一家专注微信生态的企业级直播营销服务提供商,其核心产品“微赞直播”集引流获客、交易变现、数据分析为综合一体,能够帮助客户开展在线内容营销。为提供客户更好的直播服务,微赞与腾讯安全展开合作,凭借微赞在私域营销领域…

MYSQL 一行数据拆分成多行数据

-- WH 拆分成多行 数据 20230519 SELECT * FROM ( SELECT SKU,Primary_Warehouse, SUBSTRING_INDEX(SUBSTRING_INDEX(WH, ,, n.digit1), ,, -1) as WH FROM TMP_WH_SKU INNER JOIN ( SELECT 0 digit UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELEC…

机器学习之KNN、Python实现

文章目录 一、前言二、KNN(1)简介(2)思想: "近朱者赤近墨者黑"(3)算法实现流程(4)k值得选定1. k值得作用2. 交叉验证选取 k值 三、KNN基于sklearn实现(1.&…

苹果XR头显简史:现实困境与未来预期

近几个月来,有关苹果MR头显的消息层出不穷,机构分析师大多认为6月份的WWDC23将会亮相。作为全新品类,苹果AR/VR备受期待的同时也有一些市场顾虑,例如因AR/VR产品成熟度问题,现阶段推出MR头显也饱受争议,甚至…

单例模式与多线程

文章目录 一、 简介二、详细介绍1. 立即加载/饿汉模式2. 延迟加载/懒汉模式3. 使用静态内置类实现单例模式4. 序列化和反序列化的单例模式5. 使用static代码块实现单例模式6. 使用enum枚举数据类型实现单例模式 一、 简介 在标准的23个设计模式中,单例模式在应用中…

传染病学模型 | SIR 、SEIR传染病学模型

文章目录 SIR传染病学模型SEIR传染病学模型参考资料SIR传染病学模型 SIR模型是一种流行病学模型,用于描述传染病在人群中的传播过程。SIR模型将人群分为三个类别:易感者(Susceptible)、感染者(Infectious)和康复者(Recovered)。三个类别之间的转移可以用以下三个微分方…

二、IOC容器(1)

一、IOC操作Bean管理 1.什么是Bean管理? Spring创建对象Spring注入属性Bean管理是2个操作 2.Bean管理操作有两种方式 基于xml配置文件方式实现基于注解方式实现 二、IOC操作Bean管理(基于xml方式) 1.基于xml方式创建对象 使用bean标签&…

new与delete用法详解与底层原理,operator new与operator delete函数,定位new与内存泄漏介绍等

tips 其实进程运行起来或者说程序运行起来都是去执行函数,任务就是不断的去执行函数。C的入口就是main函数,然后在这个函数当中可能碰到程序某些调用其他函数的语句就去调用其他函数。在全局的区域可以去创建变量,定义函数,但就是…

数据结构-栈,队列

栈,队列 1 知识框架2 栈2.1 顺序栈2.2 链式栈 3 队列3.1 顺序队列3.2 循环队列3.3 链式队列 4 数组4.1 二维数组4.2 特殊数组的压缩存储 1 知识框架 2 栈 定义:只允许在一端进行插入或删除得到线性表 栈的数学性质:n个不同元素进栈&#xff…

phpWord使用模板填充数据:包含表格及嵌套表格(多个表格/循环表格)

参考文档 模板处理 基础使用 安装过程省略,首先加载模板: $templateProcessor new TemplateProcessor(ROOT_PATH . uploads/template/自动生成模板.docx); 完整保存流程 首先,要进行测试,起码能够写一个完整的demo,以下是我测…

实验三---面向对象分析与设计——UML用例图与活动图

一、实验目的: 掌握面向对象分析中用例建模的基本思想,学会识别参与者和用例,掌握UML用例图的绘制方法,学会编写用例说明;了解活动图的作用和组成元素,掌握UML活动图的绘制方法,学会使用活动图来…

秒懂算法 | KMP算法(Java描述)

Knuth-Morris-Pratt 算法(简称 KMP)是由高德纳(Donald Ervin Knuth)和沃恩普拉特在1974年构思,同年詹姆斯H莫里斯也独立地设计出该算法,最终三人于1977年联合发表。该算法较Brute-Force算法有较大改进&…

门电路OD门

漏极开路输出的门电路(OD门) 为了满足输出电平的变换,输出大负载电流,以及实现“线与”功能,将CMOS门电路的输出级做成漏极开路的形式,称为漏极开路输出的门电路,简称OD(Open&#x…

【JVM】1. JVM与Java体系结构

文章目录 1.1. 前言🍉1.2. 参考书目🍉1.3. Java及JVM简介🍉1.4. Java发展的重大事件🍉1.5. 虚拟机与Java虚拟机🍉1.6. JVM的整体结构🍉1.7. Java代码执行流程🍉1.8. JVM的架构模型🍉…

4. QT中的鼠标键盘事件 --- 鼠标拖拽案例

1. 说明 在QT的控件或者窗口当中,如果对于当前鼠标或者键盘的功能需要自己定义,可以重写父类当中对应虚函数,主要包括以下几个: //键盘按键按下 virtual void keyPressEvent(QKeyEvent *event); //键盘按键抬起 virtual void ke…

为什么C++这么复杂还不被淘汰?

C是一门广泛使用的编程语言,主要用于系统和应用程序的开发。尽管C具有一些复杂的语法和概念,但它仍然是编程界的重量级选手,在编程语言排行榜中一直位居前列。为什么C这么复杂还不被淘汰呢? C有以下优势 1、C具有高性能 C是一门编…

unity进阶学习笔记:photonServer测试

photonServer是由photon发布的一个网络框架,其封装了UDP和TCP通信机制让用户可以直接调用API实现网络游戏通信 1 photonServer下载安装 进入Photon官网的SDK选项,选择下载Server。目前Server版本已经更新到v5,这里我为了和教程保持一致下载…

Unittest接口测试生成报告和日志方法

HTML报告 直接把HTMLTestRunner.py放入工程目录即可报告脚本封装 #HTNL格式报告now datetime.datetime.now().strftime(%Y-%m-%d_%H_%M_%S)htmlreport reportpath "/" now r"result.html"print("测试报告生成地址:%s"% htmlre…

Revit干货 | 系统族、内建族、可载入族一次性搞清楚!

对于使用人数较多的revit软件,其中的许多概念与我们常用的CAD完全不同,以至于让许多工程师觉得revit软件有点高深莫测,不可琢磨,从而有了抗拒心理。 Revit软件中的重要概念: “族”是revit软件中的很重要也很基本的概念…