go-zero 是如何实现令牌桶限流的?

news2024/11/24 6:48:23

原文链接:

上一篇文章介绍了 如何实现计数器限流?主要有两种实现方式,分别是固定窗口和滑动窗口,并且分析了 go-zero 采用固定窗口方式实现的源码。

但是采用固定窗口实现的限流器会有两个问题:

  1. 会出现请求量超出限制值两倍的情况
  2. 无法很好处理流量突增问题

这篇文章来介绍一下令牌桶算法,可以很好解决以上两个问题。

工作原理

算法概念如下:

  • 令牌以固定速率生成;
  • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行;
  • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。

令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。

源码实现

源码分析我们还是以 go-zero 项目为例,首先来看生成令牌的部分,依然是使用 Redis 来实现。

// core/limit/tokenlimit.go

// 生成 token 速率
script = `local rate = tonumber(ARGV[1])
// 通容量
local capacity = tonumber(ARGV[2])
// 当前时间戳
local now = tonumber(ARGV[3])
// 请求数量
local requested = tonumber(ARGV[4])
// 需要多少秒才能把桶填满
local fill_time = capacity/rate
// 向下取整,ttl 为填满时间 2 倍
local ttl = math.floor(fill_time*2)
// 当前桶剩余容量,如果为 nil,说明第一次使用,赋值为桶最大容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
if last_tokens == nil then
    last_tokens = capacity
end

// 上次请求时间戳,如果为 nil 则赋值 0
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
if last_refreshed == nil then
    last_refreshed = 0
end

// 距离上一次请求的时间跨度
local delta = math.max(0, now-last_refreshed)
// 距离上一次请求的时间跨度能生成的 token 数量和桶内剩余 token 数量的和
// 与桶容量比较,取二者的小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 判断请求数量和桶内 token 数量的大小
local allowed = filled_tokens >= requested
// 被请求消耗掉之后,更新剩余 token 数量
local new_tokens = filled_tokens
if allowed then
    new_tokens = filled_tokens - requested
end

// 更新 redis token
redis.call("setex", KEYS[1], ttl, new_tokens)
// 更新 redis 刷新时间
redis.call("setex", KEYS[2], ttl, now)

return allowed`

Redis 中主要保存两个 key,分别是 token 数量和刷新时间。

核心思想就是比较两次请求时间间隔内生成的 token 数量 + 桶内剩余 token 数量,和请求量之间的大小,如果满足则允许,否则则不允许。

限流器初始化:

// A TokenLimiter controls how frequently events are allowed to happen with in one second.
type TokenLimiter struct {
    // 生成 token 速率
    rate           int
    // 桶容量
    burst          int
    store          *redis.Redis
    // 桶 key
    tokenKey       string
    // 桶刷新时间 key
    timestampKey   string
    rescueLock     sync.Mutex
    // redis 健康标识
    redisAlive     uint32
    // redis 健康监控启动状态
    monitorStarted bool
    // 内置单机限流器
    rescueLimiter  *xrate.Limiter
}

// NewTokenLimiter returns a new TokenLimiter that allows events up to rate and permits
// bursts of at most burst tokens.
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
    tokenKey := fmt.Sprintf(tokenFormat, key)
    timestampKey := fmt.Sprintf(timestampFormat, key)

    return &TokenLimiter{
        rate:          rate,
        burst:         burst,
        store:         store,
        tokenKey:      tokenKey,
        timestampKey:  timestampKey,
        redisAlive:    1,
        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
    }
}

其中有一个变量 rescueLimiter,这是一个进程内的限流器。如果 Redis 发生故障了,那么就使用这个,算是一个保障,尽量避免系统被突发流量拖垮。

提供了四个可调用方法:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *TokenLimiter) Allow() bool {
    return lim.AllowN(time.Now(), 1)
}

// AllowCtx is shorthand for AllowNCtx(ctx,time.Now(), 1) with incoming context.
func (lim *TokenLimiter) AllowCtx(ctx context.Context) bool {
    return lim.AllowNCtx(ctx, time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
    return lim.reserveN(context.Background(), now, n)
}

// AllowNCtx reports whether n events may happen at time now with incoming context.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowNCtx(ctx context.Context, now time.Time, n int) bool {
    return lim.reserveN(ctx, now, n)
}

最终调用的都是 reverveN 方法:

func (lim *TokenLimiter) reserveN(ctx context.Context, now time.Time, n int) bool {
    // 判断 Redis 健康状态,如果 Redis 故障,则使用进程内限流器
    if atomic.LoadUint32(&lim.redisAlive) == 0 {
        return lim.rescueLimiter.AllowN(now, n)
    }

    // 执行限流脚本
    resp, err := lim.store.EvalCtx(ctx,
        script,
        []string{
            lim.tokenKey,
            lim.timestampKey,
        },
        []string{
            strconv.Itoa(lim.rate),
            strconv.Itoa(lim.burst),
            strconv.FormatInt(now.Unix(), 10),
            strconv.Itoa(n),
        })
    // redis allowed == false
    // Lua boolean false -> r Nil bulk reply
    if err == redis.Nil {
        return false
    }
    if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
        logx.Errorf("fail to use rate limiter: %s", err)
        return false
    }
    if err != nil {
        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
        // 如果有异常的话,会启动进程内限流
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    code, ok := resp.(int64)
    if !ok {
        logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    // redis allowed == true
    // Lua boolean true -> r integer reply with value of 1
    return code == 1
}

最后看一下进程内限流的启动与恢复:

func (lim *TokenLimiter) startMonitor() {
    lim.rescueLock.Lock()
    defer lim.rescueLock.Unlock()

    // 需要加锁保护,如果程序已经启动了,直接返回,不要重复启动
    if lim.monitorStarted {
        return
    }

    lim.monitorStarted = true
    atomic.StoreUint32(&lim.redisAlive, 0)

    go lim.waitForRedis()
}

func (lim *TokenLimiter) waitForRedis() {
    ticker := time.NewTicker(pingInterval)
    // 更新监控进程的状态
    defer func() {
        ticker.Stop()
        lim.rescueLock.Lock()
        lim.monitorStarted = false
        lim.rescueLock.Unlock()
    }()

    for range ticker.C {
        // 对 redis 进行健康监测,如果 redis 服务恢复了
        // 则更新 redisAlive 标识,并退出 goroutine
        if lim.store.Ping() {
            atomic.StoreUint32(&lim.redisAlive, 1)
            return
        }
    }
}

以上就是本文的全部内容,如果觉得还不错的话欢迎点赞转发关注,感谢支持。


参考文章:

  • https://juejin.cn/post/7052171117116522504
  • https://www.infoq.cn/article/Qg2tX8fyw5Vt-f3HH673

推荐阅读:

  • 如何实现计数器限流?
  • go-zero 是如何做路由管理的?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/859942.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

断续模式(DCM)与连续模式(CCM)

断续模式(DCM)与连续模式(CCM)是开关电源最常用的两种工作模式。当初级开关管导通前,初级绕组还存在能量,不完全传递到次级,这种情况就叫连续模式。若初级绕组能量完全传递到次级,则为断续模式。…

Linux与安卓安全对抗

导读大家都知道安卓是基于Linux内核,而且大家也知道Linux的安全性是公认的,那为什么和Linux有着类似嫡系关系的安卓却一直被人诟病不安全呢?要想说清楚这个问题,我们需要了解一下安卓和Linux到底是什么关系,而且这两个…

中国信通院高质量数字化转型产品及服务全景图发布,合合信息多项AI产品入选

随着5G、人工智能、大数据等新一代技术的发展,企业在商业竞争中正面临更多不确定性。中国信通院高度关注企业数字化转型中遇到的痛点,发起“铸基计划-高质量数字化转型行动”,链接企业数字化转型供、需两侧的发展需求,以期推动国家…

MySQL—缓存

目录标题 为什么要有Buffer Poolbuffer pool有多大buffer pool缓存什么 如何管理Buffer Pool如何管理空闲页如何管理脏页如何提高缓存命中率预读失效buffer pool污染 脏页什么时候会被刷入到磁盘 为什么要有Buffer Pool 虽然说MySQL的数据是存储在磁盘中,但是也不能…

C++——缺省参数

缺省参数的定义 缺省参数是声明或定义函数时为函数的参数指定一个缺省值。在调用该函数的时候&#xff0c;如果没有指定实参&#xff0c;则采用该形参的缺省值&#xff0c;否则使用指定的实参。 void Func(int a 0) {cout << a << endl; } int main() { Func()…

【C++学习手札】new和delete看这一篇就够了!

​ 食用指南&#xff1a;本文在有C基础的情况下食用更佳 &#x1f340;本文前置知识&#xff1a; C类 ♈️今日夜电波&#xff1a; Prover—milet 1:21 ━━━━━━️&#x1f49f;──────── 4:01 …

OI易问卷协助企业服务好员工,收集员工反馈与信息

OI易问卷——企业问卷调查工具 OI易问卷&#xff0c;是群硕专为企业打造&#xff0c;对内服务员工的调查问卷。 集成于办公联合创新平台&#xff0c;并进一步帮助客户实现与微信或企业微信等其他平台的对接。 可以有效促进员工服务数字化&#xff0c;提高各部门工作效率&…

mysql的相关指令

mysql的相关指令 DML 数据操作语言DQL数据查询 mysql -uroot -p //启动数据库 show databases; //查看有哪些数据库 use 数据库名; //使用某个数据库 show tables; //查看数据库内有哪些表 exit; //退出mysql的命令环境 create database 数据库名称 charset utf8; //创建数据…

四项代表厂商,Kyligence 入选 Gartner 数据及人工智能相关领域多项报告

近日&#xff0c;全球权威的技术研究与咨询公司 Gartner 发布了《2023 年中国数据、分析及人工智能技术成熟度曲线》、《2023 年分析与商业智能技术成熟度曲线报告》、《2023 年数据管理技术成熟度曲线报告》&#xff0c;Kyligence 分别入选这三项报告的指标平台 Metrics Store…

【Git】 git push origin master Everything up-to-date报错

hello&#xff0c;我是索奇&#xff0c;可以叫我小奇 git push 出错&#xff1f;显示 Everything up-to-date 那么看看你是否提交了message 下面是提交的简单流程 git add . git commit -m "message" git push origin master 大多数伙伴是没写git commit -m "…

二维码查分系统制作方法大公开:用这个方法,你也可以快速拥有

自从“双减”政策颁布以来&#xff0c;学校对成绩的公布变得更加重视。尤其是小学年级&#xff0c;将成绩信息从分数制改为等级制进行发布。同时&#xff0c;成绩公布的方式也有了新的规定&#xff1a;禁止公开公布&#xff0c;不允许为学生成绩进行排名&#xff0c;并需要以特…

Java课题笔记~ ServletConfig

概念&#xff1a;代表整个web应用&#xff0c;可以和程序的容器(服务器)来通信 <?xml version"1.0" encoding"UTF-8"?> <web-app xmlns"http://java.sun.com/xml/ns/javaee"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instan…

突破笔试:力扣129. 求根节点到叶节点数字之和

1. 题目链接&#xff1a;129. 求根节点到叶节点数字之和 给你一个二叉树的根节点 root &#xff0c;树中每个节点都存放有一个 0 到 9 之间的数字。每条从根节点到叶节点的路径都代表一个数字&#xff1a;例如&#xff0c;从根节点到叶节点的路径 1 -> 2 -> 3 表示数字 …

操作系统 -- 进程间通信

一、概述 进程经常需要与其他进程通信。例如&#xff0c;在一个shell管道中&#xff0c;第一个进程的输出必须传送给第二个进程&#xff0c;这样沿着管道传递下去。因此在进程之间需要通信&#xff0c;而且最好使用一种结构良好的方式&#xff0c;不要使用中断。在下面几节中&…

day3 STM32 GPIO口介绍

GPIO接口简介 通用输入输出接口GPIO是嵌入式系统、单片机开发过程最常用的接口&#xff0c;用户可以通过编程灵活的对接口进行控制&#xff0c;实现对电路板上LED、数码管、按键等常用设备控制驱动&#xff0c;也可以作为串口的数据收发管脚&#xff0c;或AD的接口等复用功能使…

ElasticSearch:项目实战(2)

ElasticSearch: 项目实战 (1) 需求&#xff1a; 新增文章审核通过后同步数据到es索引库 1、文章服务中添加消息发送方法 在service层文章新增成功后&#xff0c;将数据通过kafka消息同步发送到搜索服务 Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;/…

【go语言学习笔记】04 Go 语言工程管理

文章目录 一、质量保证1. 单元测试1.1 定义1.2 Go 语言的单元测试1.3 单元测试覆盖率 2. 基准测试2.1 定义2.2 Go 语言的基准测试2.3 计时方法2.4 内存统计2.5 并发基准测试2.6 基准测试实战 3. 特别注意 二、性能优化1. 代码规范检查1.1 定义1.2 golangci-lint1.2.1 安装1.2.2…

网络基础2(HTTP,HTTPS,传输层协议详解)

再谈协议 在之前利用套接字进行通信的时候&#xff0c;我们都是利用 “字符串” 进行流式的发送接收&#xff0c;但是我们平常进行交流通信肯定不能只是简单的发送字符串。 比如我们用QQ进行聊天&#xff0c;我们不仅需要得到对方发送的消息&#xff0c;还要知道对方的昵称&…

Gitee+Jenkins(docker版)自动推送并部署Springboot项目到远程服务器

如果要参考gitlab配置请参考GitlabWebhook自动推送并更新Springboot项目 Gitlab的配置部分 环境介绍 Jenkins服务器(Centos7.6): docker安装的jenkins,参考Jenkins(docker安装)部署Springboot项目JDK1.8Maven3.6.3 注意docker安装的jenkins,而且是较新的版本,所以jenkins容器…

[足式机器人]Part4 机械设计 Ch00/01 绪论+机器结构组成与连接 ——【课程笔记】

本文仅供学习使用 本文参考&#xff1a; 《机械设计》 王德伦 马雅丽课件与日常作业可登录网址 http://edu.bell-lab.com/manage/#/login&#xff0c;选择观摩登录&#xff0c;查看2023机械设计2。 机械设计-Ch00Ch01——绪论机器结构组成与连接 Ch00-绪论0.1 何为机械设计——…