Go singlefight 源码详解|图解

news2024/11/8 18:07:35

写在前面

通俗的来说就是 singleflight 将相同的并发请求合并成一个请求,进而减少对下层服务的压力,通常用于解决缓存击穿的问题。

在这里插入图片描述

详解

基础结构

golang.org/x/sync/singleflight singleflight结构体:

type call struct {
	wg sync.WaitGroup

	// 这些字段在 WaitGroup 结束前写入一次
	// 只有在 WaitGroup 结束后才会被读取。
	val interface{}
	err error

	// 这些字段在 WaitGroup 结束前使用 singleflight 互斥锁进行读写
	// 在 WaitGroup 结束后读取但不写入。
	dups  int
	chans []chan<- Result
}

Group 代表分成多个工作组,形成一个命名空间,在这个命名空间中,各工作单元可以重复执行。

type Group struct {
	mu sync.Mutex       // 互斥锁
	m  map[string]*call // 懒加载
}

Result 保存 Do 方法的结果,以便在通道上传递。做异步处理。

type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

简单demo

func TestSingleFightExample(t *testing.T) {
	var group singleflight.Group
	// 模拟一个并发请求
	for i := 0; i < 5; i++ {
		go func(i int) {
			key := "example"
			tmp := i // 将tmp放进去
			val, err, _ := group.Do(key, func() (interface{}, error) {
				// 模拟一次耗时操作
				time.Sleep(time.Second)
				return fmt.Sprintf("result_%d", tmp), nil
			})
			if err != nil {
				fmt.Println("Error:", err)
			}
			fmt.Println("Value:", val)
		}(i)
	}
	// 等待所有请求完成
	time.Sleep(3 * time.Second)
}

结果:这是一个很随机的过程,0~4都有可能,主要看哪个协程最先进来。

Value: result_2
Value: result_2
Value: result_2
Value: result_2
Value: result_2

Do 执行函数:对同一个 key 多次调用的时候,在第一次调用没有执行完的时候, 只会执行一次 fn,其他的调用会阻塞住等待这次调用返回, shared 表示fn的结果是否被共享

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

DoChan 和 Do 类似,只是 DoChan 返回一个 channel,也就是同步与异步的区别

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

Forget:用于通知 Group 删除某个 key 这样后面继续这个 key 的调用的时候就不会在阻塞等待了

func (g *Group) Forget(key string){
	g.mu.Lock()
	if c, ok := g.m[key]; ok {
		c.forgotten = true
	}
	delete(g.m, key)
	g.mu.Unlock()
}

singleflight的本质是对某次函数调用的复用,只执行1次,并将执行期间相同的函数返回相同的结果。由此产生一个问题,如果实际执行的函数出了问题,比如超时,则在此期间的所有调用都会超时,由此需要一些额外的方法来控制。

在一些对可用性要求极高的场景下,往往需要一定的请求饱和度来保证业务的最终成功率。一次请求还是多次请求,对于下游服务而言并没有太大区别,此时使用 singleflight 只是为了降低请求的数量级,那么使用 Forget() 提高下游请求的并发。

常见面试题

singleflight 是什么?什么时候用的?

缓存失效,合并请求的时候用的,这样我们就可以减少对DB的请求压力。
在这里插入图片描述

如果这个goruntine超时怎么办?

singleflight 内部使用 waitGroup 来让同一个 key 的除了第一个请求的后续所有请求都阻塞。直到第一个请求执行 func 返回后,其他请求才会返回。
这意味着,如果 func 执行需要很长时间,那么后面的所有请求都会被一直阻塞。
这时候我们可以使用 DoChan 结合ctx + select做超时控制

func TestSingleFightTimeout(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	go doFly(ctx)
	time.Sleep(2 * time.Second)
	cancel() // 2秒后超时
}

func doFly(ctx context.Context) {
	var g singleflight.Group
	key := "example"
	// 使用 DoChan 结合 select 做超时控制
	result := g.DoChan(key, func() (interface{}, error) {
		time.Sleep(5 * time.Second) // 模拟超时
		return "result", nil
	})
	select {
	case r := <-result:
		fmt.Println("r", r.Val)
	case <-ctx.Done():
		fmt.Println("done")
		return
	}
}

结果输出:

done

上述代码中,我们将主进程先sleep 2秒,然后再进行cancel,那么此时我们将会让DoChan这个方法 time.Sleep 5秒模拟超时。那么我们会发现函数过了2秒之后就会输出done

doChan方法具体是怎么实现的?

在DoChan方法中,有一个 go g.doCall(c, key, fn) 的操作,当一个 goroutine 来执行,并通过channel 来返回数据,这样外部可以自定义超时逻辑,防止因为 fn 的阻塞,导致大量请求都被阻塞。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok { // 如果没有这个key
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}
	c := &call{chans: []chan<- Result{ch}} // 构造异步返回结构体,可以接参数进行超时
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn) // 异步执行

	return ch
}

如果请求失败了怎么办?

如果第一个请求失败了,那么后续所有等待的请求都会返回同一个 error。但实际上可以根据下游能支撑的 rps 定时 forget 这个 key,让更多的请求能有机会走到后续逻辑。

go func() {
       time.Sleep(100 * time.Millisecond)
       g.Forget(key)
   }()

比如1秒内有100个请求过来,正常是第一个请求能执行queryDB,后续99个都会阻塞。增加这个 Forget 之后,每 100ms 就能有一个请求执行 queryDB,相当于是多了几次尝试的机会,相对的也给DB造成了更大的压力,需要根据具体场景进去取舍。 因为有可能前几次是因为DB的抖动导致的查询失败,重试之后就能实现了。

参考链接

[1] https://pkg.go.dev/golang.org/x/sync/singleflight
[2] https://www.lixueduan.com/posts/go/singleflight
[3] https://juejin.cn/post/7093859835694809125

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1809020.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

永磁同步电机双矢量模型预测转矩控制MPTC

导读&#xff1a;本期文章主要介绍永磁同步电机双矢量模型预测转矩控制。由于传统直接转矩控制和单矢量的模型预测转矩控制转矩纹波较大&#xff0c;且在全速范围内的开关频率不固定&#xff0c;针对这一缺陷&#xff0c;引入双矢量MPTC。 如果需要文章中的仿真模型&#xff0…

AI生成个性化壁纸

使用天工AI 将图片设置成桌面壁纸

transformer 位置编码源码解读

import torch import mathdef get_positional_encoding(max_len, d_model):"""计算位置编码参数&#xff1a;max_len -- 序列的最大长度d_model -- 位置编码的维度返回&#xff1a;一个形状为 (max_len, d_model) 的位置编码张量"""positional_e…

IDEA启动项目报java.lang.OutOfMemoryError: GC overhead limit exceeded

idea编译项目时报j ava.lang.OutOfMemoryError: GC overhead limit exceeded错误&#xff0c;教你两步搞定&#xff01; 第一步&#xff1a;打开help -> Edit Custom VM Options ,修改xms和xmx的大小&#xff0c;如下图&#xff1a; 第二步&#xff1a;File -> Settings…

素数的无穷大的证明

素数的无穷大——欧几里得的证明 文章目录 一、说明二、欧几里得证据三、哥德巴赫对素数无穷性的证明&#xff08;1730&#xff09;四、Frstenberg 对素数无穷性的证明(1955)五、库默尔对欧几里得证明的重述 一、说明 众所周知&#xff0c;素数是无限多的。然而&#xff0c;两…

使用 Sysbench 测试文件的读写速度

要使用 Sysbench 测试文件的读写速度&#xff0c;你可以按照以下步骤进行&#xff1a; 安装 Sysbench&#xff1a; 如果你还没有安装 Sysbench&#xff0c;可以通过以下命令在 Ubuntu 上安装&#xff1a; sudo apt install sysbench创建测试文件&#xff1a; 首先&#xff0c…

32-读取Excel数据(xlrd)

本篇介绍如何使在python中读取excel数据。 一、环境准备 先安装xlrd模块&#xff0c;打开cmd&#xff0c;输入 pip install xlrd 在线安装。 二、基本操作 import xlrd# 打开excel表格 data xlrd.open_workbook(test.xlsx)# 2.获取sheet表格 # 方式一&#xff1a;通过索引顺…

【Java基础】多线程开发

Java多线程编程学习笔记 Author: Jim.kk Video: Bilibili 文章目录 Java多线程编程学习笔记学习路线简介程序、进程与线程的关系JVM简介 | 多线程在JVM中的执行示例CPU 线程的调度方式多线程的意义并行与并发 创建多线程 1 | Thread 与 Runnable方式 1 | 继承 Thread 类方式 …

spool 管道 小文件 mknod

Spool File In SQL*PLUS in Multiple Small Files ? (Doc ID 2152654.1)​编辑To Bottom In this Document Goal Solution APPLIES TO: Oracle Database - Enterprise Edition - Version 10.2.0.1 to 12.1.0.2 [Release 10.2 to 12.1] Oracle Database Cloud Schema Service…

【机器学习】基于卷积LSTM的视频预测

1. 引言 1.1 LSTM是什么 LSTM&#xff08;Long Short-Term Memory&#xff09;是一种特殊的循环神经网络&#xff08;RNN&#xff09;变体&#xff0c;旨在解决传统RNN在处理长序列数据时遇到的梯度消失和梯度爆炸问题。LSTM通过引入门控机制和细胞状态的概念&#xff0c;使得…

表格状态码转换,其他索引串转成名字

1.问题分析 原数据库 关联指标为数字串的形式&#xff0c;每个小数对应的是另一张表index的属性&#xff0c;我们想知道对应指标的名称&#xff0c;怎么在这里下面这种形式呢&#xff1f; 两种思路&#xff1a; 1.修改在后端处理&#xff0c;把后端关联指标部分修改成图二的…

来腾讯第4天,我已经焦虑昏了啊!

大家好&#xff0c;我是白露啊。 今天在看到一个实习生在抱怨&#xff0c;给我笑惨了。 标题是&#xff1a;“腾讯实习第4天&#xff0c;焦虑昏了”&#xff01; 他写道&#xff1a;“怎么办啊牛爷爷们&#xff0c;什么都不会。业务看不懂&#xff0c;文档看不懂&#xff0c;…

el-table 实现表头置顶【干货满满】附源码

a)一般情况下&#xff0c;想要在 ElTable 上实现表头固定&#xff0c;滑动滚动条时希望表头常显&#xff0c;不被滚动条顶上去。这时候就需要给表格添加高度&#xff0c;但是这个高度需要提前确定好&#xff0c;不是很方便&#xff0c;表格上边一段距离不是固定的&#xff0c;常…

codeforce round951 div2

A guess the maximum 问题&#xff1a; 翻译一下就是求所有相邻元素中max - 1的最小值 代码&#xff1a; #include <iostream> #include <algorithm>using namespace std;const int N 5e4;int a[N]; int n;void solve() {cin >> n;int ans 0x3f3f3f3f;…

一个简单好用的 C# Easing Animation 缓动动画类库

文章目录 1.类库说明2.使用步骤2.1 创建一个Windows Form 项目2.2 安装类库2.3 编码2.4 效果 3. 扩展方法3.1 MoveTo 动画3.2 使用回调函数的Color动画3.3 属性动画3.4 自定义缓动函数 4.该库支持的内置缓动函数5.代码下载 1.类库说明 App.Animations 类库是一个很精炼、好用的…

RT-DETR 详解之查询去噪( DeNoise)

引言 前面我们已经讲解了RT-DETR的基本结构与Efficient Hybrid Encoder部分&#xff0c;在这篇博客里&#xff0c;博主将主要记录RT-DETR的第二个创新点&#xff1a;Uncertainty-minimal Query Selection 查询向量选择为什么重要&#xff1f; 关于 Query Selection&#xff0…

使用WGCLOUD监测进程的cpu和内存使用情况

WGCLOUD是一款国产免费的运维平台&#xff0c;可以监测很多指标数据&#xff0c;我们在这篇文章主要描述如何使用WGCLOUD监测进程 其实官网的进程使用描述也比较清楚&#xff0c;看看 进程应用、中间件监测使用说明&#xff08;对我们关注的业务系统、中间件、进程进行实时监测…

计网总结☞网络层

.................................................. 思维导图 ........................................................... 【Wan口和Lan口】 WAN口&#xff08;Wide Area Network port&#xff09;&#xff1a; 1)用于连接外部网络&#xff0c;如互联…

[chisel]马上要火的硬件语言,快来了解一下优缺点

Chisel是什么&#xff1f; Chisel的全称为Constructing Hardware In a Scala Embedded Language&#xff0c;是一个基于Scala的DSL&#xff08;Domain Specific Language&#xff0c;特定领域专用语言&#xff09;。2012年&#xff0c;加州大学伯克利分校&#xff08;UC Berkel…

大话设计模式解读02-策略模式

本篇文章&#xff0c;来解读《大话设计模式》的第2章——策略模式。并通过Qt和C代码实现实例代码的功能。 1 策略模式 策略模式作为一种软件设计模式&#xff0c;指对象有某个行为&#xff0c;但是在不同的场景中&#xff0c;该行为有不同的实现算法。 策略模式的特点&#…