Golang标准库限流器rate使用

news2024/10/6 22:20:07

       限流就是限制系统的输入和输出流量来达到保护系统的目的,限流在实际场景中应用十分广泛,尤其在高并发场景下,为了保证系统的可以用性,我们需要采取一些限流措施降级,一旦达到限制的阈值,就需要限制流量并采取一些措施来完成限制流量的目的(比如:延迟处理、拒绝处理等),以防止过多的请求而导致系统崩溃。

在golang的标准库golang.org/x/time/rate有一个限流器的实现,这个限流器的实现方案是令牌桶。

 

1、令牌桶

令牌桶是比较常见的限流算法之一,如下图所示:

        令牌桶可以用来限制突发的流量,在下面图中,有一个桶,桶的大小是固定的,系统以一定的速率往桶中添加令牌,当桶满时,就会溢出,新添加的令牌会被丢弃。当请求需要被处理时,需要先从桶中获取令牌,当没有令牌可取时,则可以选择排队等待或者拒绝服务。

在这里插入图片描述

从上面的图看来,令牌桶的实现需要一个定时器和等待队列,定时器以一定的频率往桶中放入令牌,而等待队列用于存放等待的请求。但是这样的实现效率太低,在golang的标准库中的实现是通过计算时间的差值来算出令牌的。

 

2、标准库限流器的使用

标准库中的限流器相关定义如下:

Limit:速率,定义了某些事件的最大速率,为每秒事件数,也就是每秒往令牌桶中放入多少个令牌。Inf是无限速率。

type Limit float64

const Inf = Limit(math.MaxFloat64)

Every:这个函数可以将产生一个令牌的时间转化为每秒产生多少个令牌,比如100ms产生一个令牌,那么1s将产生10个令牌。

func Every(interval time.Duration) Limit {
	if interval <= 0 {
		return Inf
	}
	return 1 / Limit(interval.Seconds())
}

NewLimiter:创建一个限流器

func NewLimiter(r Limit, b int) *Limiter {
	return &Limiter{
		limit: r,
		burst: b,
	}
}

参数如下:

  • 第一个参数r Limit:产生令牌的速率,也就是每秒往桶中放入多少个令牌。
  • 第二个参数b int:令牌桶的大小。

对于下面这个例子,就是构造一个每秒产生10个令牌,令牌桶大小为20的限流器:

limiter := NewLimiter(10, 20)

 

2.1 消费令牌

Limiter提供了三种消费令牌的方法,可以用来消费一个或多个令牌,每种方法代表了当令牌不足时,各种的对应手段:

  • Wait / WaitN
  • Allow / AllowN
  • Reserve / ReserveN

 

Wait / WaitN

func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

Wait相当于WaitN(ctx, 1)

WaitN消费n个令牌,如果令牌的数量不够,将会阻塞等待。它的第一个参数为Context,也就是我们可以控制等待的最大时长。如果n超过了突发大小也就是桶的大小、Context被取消或者预期的等待时间超过了Context的Deadline,将会返回一个错误。

 

Allow / AllowN

func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool

Allow相当于AllowN(time.Now(), 1)

AllowN可以用来获取截至到某一时间,是否有n个令牌可用。如果满足则返回true,同时消费n个令牌,反之则不消费,返回false。

通常用于不满足条件则直接丢弃请求。

 

Reserve / ReserveN

func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

Reserve相当于ReserveN(time.Now(), 1)

ReserveN返回一个Reservation,Reservation可用用来指示在有n个令牌可用之前必须等待多长时间。

可用调用Reservation的Delay方法来获取需要等待的时间:

func (r *Reservation) Delay() time.Duration

或者调用Cancel来取消,该方法会将token归还:

func (r *Reservation) Cancel()

ReserveN的使用示例如下:

r := lim.ReserveN(time.Now(), 1)
if !r.OK() {
    // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
    return
}
time.Sleep(r.Delay())
Act()

Limiter支持动态设置速率以及桶的大小:

func (lim *Limiter) SetLimit(newLimit Limit)
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)

func (lim *Limiter) SetBurst(newBurst int)
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)

 

3、源码探究

3.1 Limit

Limit是令牌产生的速率

type Limit float64

它有两个方法,分别是一段时间内产生的令牌数量和产生一定的令牌数量所需的时间:

// 计算时间间隔d内产生的令牌数量
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	if limit <= 0 {
		return 0
	}
    // 时间间隔d内产生的令牌数量,产生的令牌数量 = 时间间隔 * 产生的速率  n = d * limit
	return d.Seconds() * float64(limit)
}

// 计算产生tokens个令牌所需的时间间隔
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
	if limit <= 0 {
		return InfDuration
	}
    
    // 所需的时间 = 令牌数量 / 令牌产生速率
	seconds := tokens / float64(limit)
	return time.Duration(float64(time.Second) * seconds)
}

 

3.2 Limiter

Limiter的定义如下:

type Limiter struct {
	mu     sync.Mutex       // 锁,保证并发安全
	limit  Limit            // 令牌产生的速率,每秒产生多少个令牌
	burst  int              // 桶的大小,突发速率大小 
	tokens float64
	
	last time.Time         // tokens字段的最后一次更新时间
	
	lastEvent time.Time    // 速率限制事件的最新时间 
}

可以看到在Limiter的定义中并没有定时器,token数量的计算可以使用当前时间与最后一次的更新时间以及产生令牌的速率进行计算。

Reservation的定义如下:

Reservation用来保存限流器运行在一定时间延迟后发生的事件的信息:

type Reservation struct {
	ok        bool            // 事件是否可以发生
	lim       *Limiter        // 指向Limiter
	tokens    int             // 需要的token数量
	timeToAct time.Time       // 事件可以发生的时间点
	// This is the Limit at reservation time, it can change later.
	limit Limit              // 预定时的速率,后续可能会改变
}

首先来看Reserve和ReserveN的代码:

func (lim *Limiter) Reserve() *Reservation {
	return lim.ReserveN(time.Now(), 1)
}

func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
	r := lim.reserveN(now, n, InfDuration)
	return &r
}

最终都调用到了reserveN方法,lim.reserveN(now, n, InfDuration)返回截至到现在消费n个令牌的相关信息,最大等待时间为无限。

func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	defer lim.mu.Unlock()
	
    // 如果令牌产生速率是无限的,那么事件是可以直接发生的
	if lim.limit == Inf {
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
    // 如果令牌产生的速率为0,则要看桶中是否有足够的令牌,如果没有,事件则不可能发生    
	} else if lim.limit == 0 {
		var ok bool
		if lim.burst >= n {
			ok = true
			lim.burst -= n
		}
		return Reservation{
			ok:        ok,
			lim:       lim,
			tokens:    lim.burst,
			timeToAct: now,
		}
	}
	
    // 计算最新状态,tokens为截至现在令牌桶中令牌的数量
	now, last, tokens := lim.advance(now)

	// 计算请求产生后的剩余数量
	tokens -= float64(n)

	// 计算等待间隔
	var waitDuration time.Duration
    // 如果tokens < 0,说明令牌不够了,计算产生-tokens个令牌需要等待的时间
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// 只有需要的令牌数量小于桶的大小 而且 等待时间小于最大的等待时间,该事件才可以发生
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)   // 事件发生的事件,为现在时刻 + 需要等待的事件,如果令牌足够,就不需要等待
	}

	// 更新状态
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct     // lastEvent为最后一次事件发生的时间
	} else {
		lim.last = last
	}

	return r
}

// Advance计算并返回由于时间推移而产生的lim的更新状态
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
    // last为最后一个更新token的时间
	last := lim.last
	if now.Before(last) {
		last = now
	}

	// elapsed为最后一次更新token的时间到现在的时间间隔
	elapsed := now.Sub(last)
    // 计算这个时间间隔内产生的令牌数量
	delta := lim.limit.tokensFromDuration(elapsed)
    // 计算当前时间令牌桶中令牌数量
	tokens := lim.tokens + delta
    // 令牌桶中令牌的最大数量为burst
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return now, last, tokens
}

Reserve和ReserveN最终都会返回一个Reservation,使用它可以获取事件发生需要等待的时间,如果等待的时间太长,也可以取消,取消则需要将令牌归还。

// OK返回限流器是否能够在最大等待时间内提供所请求的令牌数量。
func (r *Reservation) OK() bool {
	return r.ok
}

// 返回需要等待的时间
func (r *Reservation) Delay() time.Duration {
	return r.DelayFrom(time.Now())
}

// 取消
func (r *Reservation) Cancel() {
	r.CancelAt(time.Now())
}

func (r *Reservation) CancelAt(now time.Time) {
	if !r.ok {
		return
	}

	r.lim.mu.Lock()
	defer r.lim.mu.Unlock()
	
    // 如果r.timeToAct.Before(now) 也就是事件发生于现在之前,那么就没必要归还了,相当于事件已经发生了,直接返回
	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
		return
	}

	// 计算需要归还的令牌数量
	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
	if restoreTokens <= 0 {
		return
	}
	// 计算到现在的相关信息 
	now, _, tokens := r.lim.advance(now)
	// 计算新的桶中的token数量 
	tokens += restoreTokens
	if burst := float64(r.lim.burst); tokens > burst {
		tokens = burst
	}
	// 更新状态 
	r.lim.last = now
	r.lim.tokens = tokens
	if r.timeToAct == r.lim.lastEvent {
		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
		if !prevEvent.Before(now) {
			r.lim.lastEvent = prevEvent
		}
	}
}

 

接下来看WaitN的实现,WaitN的实现也使用到了reserveN,我们可以猜到,其实就是调用reserveN,然后判断是否需要等待,需要则等待Reservation.Delay()的时间即可:

func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
	lim.mu.Lock()
	burst := lim.burst
	limit := lim.limit
	lim.mu.Unlock()
	
    // 如果需要的令牌数大于桶的大小而且速率不是无限的,那么就返回err
	if n > burst && limit != Inf {
		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
	}
    
	// 检查ctx是否已经取消了
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}
    
	// 计算最大的等待时间间隔
	now := time.Now()
	waitLimit := InfDuration
	if deadline, ok := ctx.Deadline(); ok {
		waitLimit = deadline.Sub(now)
	}
    
	// Reserve
	r := lim.reserveN(now, n, waitLimit)
	if !r.ok {
		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
	}
    
	// 获取需要等待的时间
	delay := r.DelayFrom(now)
    // 无需等待
	if delay == 0 {
		return nil
	}
    // 创建定时器,等待delay的时间
	t := time.NewTimer(delay)
	defer t.Stop()
	select {
	case <-t.C:
		// We can proceed.
		return nil
	case <-ctx.Done():
		// Context was canceled before we could proceed.  Cancel the
		// reservation, which may permit other events to proceed sooner.
		r.Cancel()
		return ctx.Err()
	}
}

 

AllowN的实现同样用到了reserveN方法,这个方法只是简单的返回在一个时间点,需要n个令牌是否可以满足,最大的等待时间设置为了0:

func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/23109.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue-admin-template新增TagViews标签页功能,附完整代码

前言 vue-admin-template里面本身是没有TagViews标签页的&#xff0c;只有完整版的vue-element-admin才有&#xff0c;翻找网上的其他教程&#xff0c;要么代码不完整&#xff0c;要么有bug&#xff0c;本篇文章就教大家如何在vue-admin-template的基础上新增TagViews 步骤 …

分布式应用kafka + EFLFK集群部署

前言 Kafka是由Apache软件基金会开发的一个开源流处理平台&#xff0c;由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;它可以处理消费者在网站中的所有动作流数据。 这种动作&#xff08;网页浏览&#xff0c;搜索和其他用户的行动&#xff09;…

骨传导耳机优缺点有哪些?骨传导耳机科普与推荐

骨传导耳机是一种可以开放耳朵的耳机&#xff0c;所以对于耳朵比较敏感的人来说&#xff0c;这种耳机是比较友好的&#xff0c;同时因为它的佩戴方式&#xff0c;在运动圈内也很受欢迎。只不过骨传导耳机是一种新兴的耳机&#xff0c;所以很多人并不太了解它的优缺点。 我作为…

书店销售管理系统----数据库原理及应用综合实验

枯木逢春犹再发&#xff0c;人无两度再少年&#x1f342; 系统主要模块如下&#xff1a; &#xff08;1&#xff09; 书店销售管理系统设计与实现—图书入库管理及查询统计 图书入库管理&#xff1a;维护入库图书信息&#xff08;如图书编号、书名、作者、价格、图书分类、出版…

vue-element-admin后台前端解决方案(基于 vue 和 element-ui)

vue-element-admin后台前端解决方案参考文档下载安装目录结构参考文档 vue-element-admin官网&#xff0c;更多详细内容可以查看社区学习文档。 下载安装 可以把 vue-element-admin当做工具箱或者集成方案仓库&#xff0c;在 vue-admin-template 的基础上进行二次开发&#…

Java 8 给我们更好的消灭空指针解决方案

前言 大家好&#xff0c;在平时的业务开发中&#xff0c;空指针是我们经常遇到的问题&#xff0c; 他可能会导致我们的流程无法正常进行或者一些意外情况的发生。 这就是我们需要避免空指针的原因&#xff0c;那我们有哪些方式去解决这个问题呢&#xff1f; 空指针场景 包装…

Linux系统安装DB2数据库的详细步骤

1、DB2数据库的安装 一、将DB2的安装介质上传至/home目录&#xff0c;并解压&#xff1a; tar –zxvf v9.5fp3_linuxx64_server.tar.gz 二、执行LANGC 三、进入解压后的server目录&#xff08;cd server/&#xff09;&#xff0c;执行./db2setup,步骤如下&#xff1a; # cd…

数据结构-线性表与链性表(二)

目录 一、学习背景 二、简绍 三、线性表 一、什么是线性表 二、操作 1、插入 2、删除 3、查询 三、数组应用案例中源码分析 1、插入 2、删除 3、get与set 4、扩容 二、单向链表 单向链表结构 循环链表 三、数组和链表比较 1、时间复杂度角度 2、其他维度 3、…

【JS】原生js实现矩形框的绘制/拖动/缩放

1、要点及功能描述 通过js监听mouse事件来实现矩形框的绘制&#xff0c;再通过区分点击的是边角还是其他位置来实现矩形框的缩放和拖动&#xff0c;并且在拖动和缩放时&#xff0c;都做了边界限制&#xff0c;当缩放或拖动 到边界时&#xff0c;就不能继续拉缩放拖动了。当然在…

【个人简介】一枚在上海的AndroidiOSWindow逆向电子工程师

> Hello World!, I am Humenger 「 From Shanghai, China 」 「 Android Reverse engineer, applied electronic technology Shan Dong University, China 」 &#x1f41d;主要涉及平台: Android(70%),iOS(15%),Window(5%),macOS(3%),其他(7%) &#x1f98b;主要涉…

易基因|RNA m7G甲基化测序(m7G-MeRIP-seq)

N7-甲基鸟苷&#xff08;N7-methylguanosine&#xff0c;m7G&#xff09;是真核生物tRNA、rRNA和mRNA 5cap中最丰富的修饰之一。作为一种重要的表观遗传修饰&#xff0c;m7G RNA甲基化在基因表达、加工代谢、蛋白质合成、转录稳定等方面发挥着重要的作用&#xff0c;参与疾病发…

Pinely Round 1 (Div. 1 + Div. 2) E - Make It Connected思维分类讨论

昨晚的problem e 一直wa。因为答案&#xff0c;不唯一&#xff0c;调起来只能肉眼debug。被干emo了qwq。好在赛后看到 ugly2333的 思路和我差不多&#xff0c;最后还是要选取度数较小的最优, 好像从度数的角度出发&#xff0c;不容易wa。 题意&#xff1a; 给你一个图&#xf…

什么是组织孤岛?它会带来哪些影响?可以这样去对付它

作为一个在不同地点和时区与不同团队合作的远程工作者&#xff0c;我有过公平的孤岛经历。 是的&#xff0c;它们扼杀了任何组织的成长。那么&#xff0c;在使你&#xff08;和组织中的每个人&#xff09;失去生产力、困惑、自私和不快乐之后。 在这篇文章中&#xff0c;我将…

ADRV9009中armBinary反汇编IDA参数设置

armBinary.bin文件如果不做处理的话就是一堆16进制数,扔到IDA里也只是一堆有颜色的16进制数,需要进行一些参数设置。 1 选择IDA32位打开armBinary.bin文件 2 load a new file设置 Processor type选择ARM Little-endian [ARM],点击Edit ARM architecture options进行相应修…

Linux 中的内部命令和外部命令

Linux 中的内部命令和外部命令 作者&#xff1a;Grey 原文地址&#xff1a; 博客园&#xff1a;Linux 中的内部命令和外部命令 CSDN&#xff1a;Linux 中的内部命令和外部命令 什么是 bash shell ? bash shell&#xff0c;就是一个程序&#xff0c;就是 Linux 系统安装的…

漫谈 Java 平台上的反应式编程

反应式编程&#xff08;Reactive Programming&#xff09;是一套完整的编程体系&#xff0c;既有其指导思想&#xff0c;又有相应的框架和库的支持&#xff0c;并且在生产环境中有大量实际的应用。在支持度方面&#xff0c;既有大公司参与实践&#xff0c;也有强大的开源社区的…

【Linux】-- 开发工具yum、vim、gcc、g++、gdb、make、makefile使用介绍

目录 一、yum 1.了解yum &#xff08;1&#xff09;RPM &#xff08;2&#xff09;yum 2.yum使用 &#xff08;1&#xff09;查看软件包 &#xff08;2&#xff09;安装软件 &#xff08;3&#xff09;卸载软件 二.Linux编辑器-vim 1. vim概念 &#xff08;1&am…

flink集群搭建

1、安装包flink-1.10.0-bin-scala_2.11.tgz 2、tar -zxf flink-1.10.0-bin-scala_2.11.tgz 解压到指定目录 解压之后的文件名称是flink-1.10.0 3、flink-1.10.0的目录结构如下&#xff1a; bin/&#xff1a;flink的相关命令 conf/&#xff1a;flink的配置文件 examples/&a…

业务数据分析-Excel公式与函数(三)

目录 概念 运算符 地址的引用 逻辑函数 文本函数 统计函数 查找与引用函数 日期函数 常见出错信息 概念 公式&#xff1a;Excel的核心功能&#xff0c;功能强大 如果要定义的话&#xff0c;可以说是 以开头的&#xff0c;对地址进行引用的计算形式 说的高大上一点的…

方法2—并行数据流转换为一种特殊串行数据流模块的设计

并行数据流转换为一种特殊串行数据流模块的设计&#xff0c;设计两个可综合的电路模块1&#xff0c;第一个可综合模块&#xff0c;M1。2&#xff0c;描述M2模块3&#xff0c;描述M0模块的Verilog代码4&#xff0c;描述顶层模块5&#xff0c;电路生成的门级网表&#xff0c;netl…