【Go】-限流器的四种实现方法

news2024/12/23 7:40:01

目录

关于限流和限流器

固定窗口限流器

滑动窗口限流器

漏桶限流器

令牌桶限流器

总结


关于限流和限流器

        限流(Rate Limiting)是一种控制资源使用率的机制,通常用于防止系统过载和滥用。

        限流器(Rate Limiter)是一种实现限流的组件或服务。以下是限流器的一些主要用途和原因:

  1. 防止系统过载:在高流量的情况下,如网络服务或API,限流可以确保系统不会因为过多的并发请求而崩溃。通过限制每个用户或客户端的请求频率,系统可以维持稳定的性能。

  2. 资源分配:在多用户环境中,限流可以确保所有用户都公平地使用资源,防止某些用户占用过多资源而影响其他用户。

  3. 成本控制:对于云服务和API提供商来说,限流可以帮助控制成本。通过限制免费层级用户的使用量,可以鼓励用户升级到付费服务。

  4. 提高用户体验:如果一个服务因为过载而变得缓慢或不可用,限流可以提高用户体验,因为它可以保证服务在高负载下的响应速度。

  5. 防止滥用:限流可以防止恶意用户或自动化脚本滥用服务,例如防止暴力破解密码或发送垃圾邮件。

  6. 服务降级:在某些情况下,系统可能会故意降低服务质量,以保护关键功能的正常运行。限流是实现服务降级的一种方式。

  7. 合规性:某些行业法规可能要求服务提供商限制数据传输速率,以符合隐私和安全标准。

  8. 缓存友好:限流可以减少对缓存系统的冲击,因为缓存系统可能无法处理非常高的请求率。

81ea9992faabfd618afa1453cb28d4a


固定窗口限流器

        这种限流器一时间为周期,用一个计数器配合定时器,限制周期内访问的次数。

4f56e37260c0de9cc877e8d17bdb590

type FixedWindowRateLimiter struct {
     windowSize time.Duration
     limit      uint64
     counter    uint64
     ticker     *time.Ticker
     stop       chan struct{}
     status     bool
 }
 ​
 func NewFixedWindowRateLimiter(windowSize time.Duration, limit uint64) *FixedWindowRateLimiter {
     now := uint64(time.Now().UnixNano())
     return &FixedWindowRateLimiter{
        windowSize: windowSize,
        limit:      limit,
        start:      now,
        stop:       make(chan struct{}),
        status:     false,
     }
 }
  • windowSize限流器周期

  • limit最大访问次数限制

  • counter计数器

  • ticker计时器

  • stop关闭信号管道

  • status限流器状态

启动和关闭:

        Start启动定时器用go协程处理周期更新和收到关闭信号,Close向关闭信号管道发送关闭信号。

 func (fwrl *FixedWindowRateLimiter) Start() {
     fwrl.ticker = time.NewTicker(fwrl.windowSize)
     fwrl.status = true
     go func() {
         for {
             select {
             case <-fwrl.ticker.C:
                 atomic.StoreUint64(&fwrl.counter, 0)
             case <-fwrl.stop:
                 fwrl.ticker.Stop()
                 fwrl.status = false
                 return
             }
         }
     }()
 }
 ​
 func (fwrl *FixedWindowRateLimiter) Close() {
     close(fwrl.stop)
 }

实现的方法:

 // 请求一次访问
 func (fwrl *FixedWindowRateLimiter) Do() bool {
     if !fwrl.status {
         return false
     }
     currentCounter := atomic.LoadUint64(&fwrl.counter)
     if currentCounter >= fwrl.limit {
         return false
     }
     atomic.AddUint64(&fwrl.counter, 1)
     return true
 }
 ​
 // 剩余可访问次数
 func (fwrl *FixedWindowRateLimiter) Check() uint64 {
     if !fwrl.status {
         return 0
     }
     return fwrl.limit - atomic.LoadUint64(&fwrl.counter)
 }
 ​
 // 更新并重启限流器
 func (fwrl *FixedWindowRateLimiter) Update(windowSize time.Duration, limit uint64) {
     fwrl.windowSize = windowSize
     fwrl.limit = limit
     if fwrl.status {
         fwrl.ticker.Stop()
         fwrl.Start()
     }
 }

关于原子操作:

        在代码中使用了atomic包的原子操作,目的是为了保证高并发读写下限流器的数据准确性。atomic包的实现在sre/internal/runtime/atomic,本质上使用汇编语言保证了操作的原子性。

        例如swapUint64函数中,调用.Xchg64的代码如下:

TEXT ·Xchg64(SB), NOSPLIT, $0-24
     MOVD    ptr+0(FP), R0
     MOVD    new+8(FP), R1
 #ifndef GOARM64_LSE
     MOVBU   internal∕cpu·ARM64+const_offsetARM64HasATOMICS(SB), R4
     CBZ     R4, load_store_loop
 #endif
     SWPALD  R1, (R0), R2
     MOVD    R2, ret+16(FP)
     RET
 #ifndef GOARM64_LSE
 load_store_loop:
     LDAXR   (R0), R2
     STLXR   R1, (R0), R3
     CBNZ    R3, load_store_loop
     MOVD    R2, ret+16(FP)
     RET
 #endif

计数器限流的严重问题: 这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图:

img

        从上图中我们可以看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。

        我们刚才规定的是1分钟最多100个请求(规划的吞吐量),也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。

        用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。


滑动窗口限流器

        跟固定窗口限流器差不多,只不过粒度更小了。

ed2ba998132a08adbb203869895441c

 type SlidingWindowRateLimiter struct {
     windowSize time.Duration
     subWindow  time.Duration
     limit      uint64
     counters   []uint64
     start      uint64
     ticker     *time.Ticker
     subticker  *time.Ticker
     stop       chan struct{}
     status     bool
 }
 ​
 func NewSlidingWindowRateLimiter(windowSize, subWindow time.Duration, limit uint64) (*SlidingWindowRateLimiter, error) {
     if windowSize <= subWindow {
         return nil, errors.New("wrong size")
     }
     numSubWindows := int(windowSize / subWindow)
     return &SlidingWindowRateLimiter{
         windowSize: windowSize,
         subWindow:  subWindow,
         limit:      limit,
         counters:   make([]uint64, numSubWindows),
         start:      uint64(time.Now().UnixNano()),
         stop:       make(chan struct{}),
     }, nil
 }
  • windowSize限流器周期

  • subWindow滑动窗口周期

  • limit最大访问次数限制

  • counters滑动窗口计数器切片

  • start周期开启时间

  • ticker周期计时器

  • subticker窗口计时器

  • stop关闭信号管道

  • status限流器状态

启动和关闭:

        滑窗是小粒度的固定窗口,计算index并且重置计数

 func (swrl *SlidingWindowRateLimiter) Start() {
     swrl.ticker = time.NewTicker(swrl.windowSize)
     swrl.subticker = time.NewTicker(swrl.subWindow)
     swrl.status = true
     go func() {
         for {
             select {
             case <-swrl.subticker.C:
                 now := uint64(time.Now().UnixNano())
                 index := int((now - swrl.start) / uint64(swrl.subWindow))
                 atomic.StoreUint64(&swrl.counters[index%len(swrl.counters)], 0)
             case <-swrl.ticker.C:
                 swrl.start = uint64(time.Now().UnixNano())
             case <-swrl.stop:
                 swrl.ticker.Stop()
                 swrl.status = false
                 return
             }
         }
     }()
 }
 ​
 func (swrl *SlidingWindowRateLimiter) Close() {
     close(swrl.stop)
 }

其他方法:

        这里Do和Check都是要计算周期内每个滑窗总和

func (swrl *SlidingWindowRateLimiter) Do() bool {
     if !fwrl.status {
         return false
     }
     now := uint64(time.Now().UnixNano())
     startIndex := int((now - swrl.start) / uint64(swrl.subWindow))
     endIndex := int((now - swrl.start + uint64(swrl.windowSize)) / uint64(swrl.subWindow))
     // 计算周期内每个滑窗总和是否大于limit
     sum := uint64(0)
     for i := startIndex; i <= endIndex; i++ {
         index := i
         if index >= len(swrl.counters) {
             index -= len(swrl.counters)
         }
         sum += atomic.LoadUint64(&swrl.counters[index])
     }
     if sum >= swrl.limit {
         return false
     }
     // 增加当前子窗口的计数
     currentIndex := int((now - swrl.start) / uint64(swrl.subWindow))
     atomic.AddUint64(&swrl.counters[currentIndex], 1)
     return true
 }
 ​
 ​
 func (swrl *SlidingWindowRateLimiter) Check() uint64 {
     if !fwrl.status {
         return 0
     }
     now := uint64(time.Now().UnixNano())
     startIndex := int((now - swrl.start) / uint64(swrl.subWindow))
     endIndex := int((now - swrl.start + uint64(swrl.windowSize)) / uint64(swrl.subWindow))
     sum := uint64(0)
     for i := startIndex; i <= endIndex; i++ {
         index := i
         if index >= len(swrl.counters) {
             index -= len(swrl.counters)
         }
         sum += atomic.LoadUint64(&swrl.counters[index])
     }
     return swrl.limit - sum
 }
 ​
 func (swrl *SlidingWindowRateLimiter) Update(windowSize time.Duration, subWindow time.Duration, limit uint64) {
     swrl.windowSize = windowSize
     swrl.limit = limit
     swrl.subWindow = subWindow
     if swrl.status {
         swrl.ticker.Stop()
         numSubWindows := int(windowSize / swrl.subWindow)
         swrl.counters = make([]uint64, numSubWindows)
         swrl.Start()
     }
 }

漏桶限流器

        漏桶,可以想象成一个木桶下面钻了一个小孔,把水倒进来就是请求访问,水漏出去就是允许请求。

        理论上小孔流出水的速率是不变的,也就是允许请求的速率是不变的,这就是漏桶的特点,你可以随便倒水,但是流出水速率恒定,实现按了平稳的访问速率。

a93dc412009e69880b771bf38aac777

 type LeakyBucket struct {
     capacity  uint
     remaining uint
     ticker    *time.Ticker
     reset     time.Time
     rate      time.Duration
     mutex     sync.Mutex
     stop       chan struct{}
     status    bool
 }
 ​
 func NewLeakyBucket(capacity uint, rate time.Duration) (*LeakyBucket, error) {
     return &LeakyBucket{
         capacity:   capacity,
         remaining:  capacity,
         rate:       rate,
         status:     false,
     }
 }
  • capacity桶的容量

  • remaining剩余的容量

  • ticker计时器

  • reset重置的时间

  • rate漏桶的速率

  • mutes互斥锁

  • stop关闭信号管道

  • status桶的状态

启动和关闭:

 func (lb *LeakyBucket) Start() {
     lb.ticker = time.NewTicker(swrl.rate)
     lb.status = true
     lb.reset = time.Now().Add(rate)
     go func() {
         for {
             select {
             case <-lb.ticker.C:
                 lb.mutex.lock()
                 if lb.remaining < lb.capacity {
                     lb.remaining += 1
                 }
                 lb.mutex.unlock()
             case <-lb.stop:
                 lb.ticker.Stop()
                 lb.status = false
                 return
             }
         }
     }()
 }
 ​
 func (lb *LeakyBucket) Close() {
     close(lb.stop)
 }

其他方法:

// 返回容量
 func (lb *LeakyBucket) Capacity() uint {
     return lb.capacity
 }
 ​
 // 桶里剩余容量
 func (lb *LeakyBucket) Remaining() uint {
     return lb.remaining
 }
 ​
 // 重置桶
 func (lb *LeakyBucket) Reset() time.Time {
     lb.remaining = lb.capacity
     // 更新reset时间为一个rate后,这样就不用加锁了
     lb.reset = time.Now().Add(rate)
     return lb.reset
 }
 ​
 // 往桶里加请求
 func (lb *LeakyBucket) Add(amount uint) (bool, error) {
     lb.mutex.Lock()
     defer lb.mutex.Unlock()
     // 时间在重置前那就重置后再取
     if time.Now().After(lb.reset) {
         lb.reset = time.Now().Add(lb.rate)
         lb.remaining = lb.capacity
     }
     if amount > lb.remaining {
         return false, errors.New("too many")
     }
     lb.remaining -= amount
     return true, nil
 }

漏桶的问题:

        漏桶的出水速度固定,也就是请求放行速度是固定的,因此漏桶不能有效应对突发流量,但是能起到平滑突发流量(整流)的作用。

        漏桶出口的速度固定,不能灵活的应对后端能力提升,比如,通过动态扩容,后端流量从1000QPS提升到1WQPS,漏桶没有办法。


令牌桶限流器

        令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。 ​ 令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶。

a93dc412009e69880b771bf38aac777

type TookenBucket struct {
     capacity  uint
     tooken    uint
     ticker    *time.Ticker
     reset     time.Time
     rate      time.Duration
     mutex     sync.Mutex
     stop       chan struct{}
     status    bool
 }
 ​
 func NewTookenBucket(capacity uint, rate time.Duration) (*TookenBucket, error) {
     return &TookenBucket{
         capacity:   capacity,
         tooken:     0,
         rate:       rate,
         status:     false,
     }
 }
  • capacity桶的容量

  • tooken剩余的tooken

  • ticker计时器

  • reset重置的时间

  • rate漏桶的速率

  • mutes互斥锁

  • stop关闭信号管道

  • status桶的状态

启动和关闭:

 func (lb *TookenBucket) Start() {
     tb.ticker = time.NewTicker(swrl.rate)
     tb.status = true
     tb.reset = time.Now().Add(rate)
     go func() {
         for {
             select {
             case <-tb.ticker.C:
                 tb.mutex.lock()
                 if tb.tooken < lb.capacity {
                     tb.tooken += 1
                 }
                 tb.mutex.unlock()
             case <-tb.stop:
                 tb.ticker.Stop()
                 tb.status = false
                 return
             }
         }
     }()
 }
 ​
 func (tb *TookenBucket) Close() {
     close(tb.stop)
 }

其他方法:

 // 返回容量
 func (tb *TookenBucket) Capacity() uint {
     return tb.capacity
 }
 ​
 // 桶里tooken
 func (tb *TookenBucket) Cheak() uint {
     return tb.tooken
 }
 ​
 // 重置桶
 func (tb *TookenBucket) Reset() time.Time {
     tb.tooken = 0
     // 更新reset时间为一个rate后,这样就不用加锁了
     tb.reset = time.Now().Add(rate)
     return tb.reset
 }
 ​
 // 往桶里取令牌
 func (tb *TookenBucket) Took(amount uint) (bool, error) {
     tb.mutex.Lock()
     defer tb.mutex.Unlock()
     // 时间在重置前那就重置后再取
     if time.Now().After(tb.reset) {
         tb.reset = time.Now().Add(tb.rate)
         tb.token = 0
         return false, nil
     }
     if amount > tb.tooken {
         return false, errors.New("too many")
     }
     tb.tooken -= amount
     return true, nil
 }

总结

        以上就是四种限流方法实现,一般在高并发实战中,采用漏桶+令牌桶。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2264124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CTF_1

CTF_Show 萌新赛 1.签到题 <?php if(isset($_GET[url])){system("curl https://".$_GET[url].".ctf.show"); }else{show_source(__FILE__); }?> 和 AI 一起分析 1.if(isset($_GET[url]))检查GET请求中是否存在名为url的参数。 curl 2.curl…

[文献阅读] Unsupervised Deep Embedding for Clustering Analysis (无监督的深度嵌入式聚类)

文章目录 Abstract:摘要聚类深度聚类 KL散度深度嵌入式聚类(DEC)KL散度聚类软分配&#xff08;soft assignment&#xff09;KL散度损失训练编码器的初始化聚类中心的初始化 实验评估总结 Abstract: This week I read Unsupervised Deep Embedding for Clustering Analysis .It…

记录:virt-manager配置Ubuntu arm虚拟机

virt-manager&#xff08;Virtual Machine Manager&#xff09;是一个图形用户界面应用程序&#xff0c;通过libvirt管理虚拟机&#xff08;即作为libvirt的图形前端&#xff09; 因为要在Linux arm环境做测试&#xff0c;记录下virt-manager配置arm虚拟机的过程 先在VMWare中…

使用C语言编写UDP循环接收并打印消息的程序

使用C语言编写UDP循环接收并打印消息的程序 前提条件程序概述伪代码C语言实现编译和运行C改进之自由设定端口注意事项在本文中,我们将展示如何使用C语言编写一个简单的UDP服务器程序,该程序将循环接收来自指定端口的UDP消息,并将接收到的消息打印到控制台。我们将使用POSIX套…

Spring Boot 教程之三十六:实现身份验证

如何在 Spring Boot 中实现简单的身份验证&#xff1f; 在本文中&#xff0c;我们将学习如何使用 Spring设置和配置基本身份验证。身份验证是任何类型的安全性中的主要步骤之一。Spring 提供依赖项&#xff0c;即Spring Security&#xff0c;可帮助在 API 上建立身份验证。有很…

什么样的LabVIEW控制算自动控制?

自动控制是指系统通过预先设计的算法和逻辑&#xff0c;在无人工干预的情况下对被控对象的状态进行实时监测、决策和调整&#xff0c;达到预期目标的过程。LabVIEW作为一种图形化编程工具&#xff0c;非常适合开发自动控制系统。那么&#xff0c;什么样的LabVIEW控制算作“自动…

GFPS扩展技术原理(七)-音频切换消息流

音频切换消息流 Seeker和Provider通过消息流来同步音频切换能力&#xff0c;触发连接做切换&#xff0c;获取或设置音频切换偏好&#xff0c;通知连接状态等等。为此专门定义了音频切换消息流Message Group 为0x07&#xff0c;Message codes如下&#xff1a; MAC of Audio s…

视频直播点播平台EasyDSS与无人机技术的森林防火融合应用

随着科技的飞速发展&#xff0c;无人机技术以其独特的优势在各个领域得到了广泛应用&#xff0c;特别是在森林防火这一关键领域&#xff0c;EasyDSS视频平台与无人机技术的融合应用更是为传统森林防火手段带来很大的变化。 一、无人机技术在森林防火中的优势 ‌1、快速响应与高…

机器人路径规划和避障算法matlab仿真,分别对比贪婪搜索,最安全距离,RPM以及RRT四种算法

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1贪婪搜索算法原理 4.2最安全距离算法原理 4.3RPM 算法原理 4.4 RRT 算法原理 5.完整程序 1.程序功能描述 机器人路径规划和避障算法matlab仿真,分别对比贪婪搜索,最安全距离,RPM以及R…

【论文笔记】Visual Alignment Pre-training for Sign Language Translation

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Visual Alignment Pre-tra…

【附源码】Electron Windows桌面壁纸开发中的 CommonJS 和 ES Module 引入问题以及 Webpack 如何处理这种兼容

背景 在尝试让 ChatGPT 自动开发一个桌面壁纸更改的功能时&#xff0c;发现引入了一个 wallpaper 库&#xff0c;这个库的入口文件是 index.js&#xff0c;但是 package.json 文件下的 type:"module"&#xff0c;这样造成了无论你使用 import from 还是 require&…

Apache解析漏洞(apache_parsingCVE-2017-15715)

apache_parsing 到浏览器中访问网站 http://8.155.8.239:81/ 我们写一个木马 1.php.jpg 我们将写好的木马上传 会得到我们上传文件的路径 我们访问一下 发现上传成功 发现木马运行成功&#xff0c;接下来使用蚁剑连接我们的图片马 获取 shell 成功 CVE-2013-454 我们还是到…

C++-----函数与库

数学中的函数与编程中的函数对比 数学中的函数 - 数学函数是一种映射关系&#xff0c;例如&#xff0c;函数\(y f(x)x^{2}\)&#xff0c;对于每一个输入值\(x\)&#xff0c;都有唯一确定的输出值\(y\)。它侧重于描述变量之间的数量关系&#xff0c;通常通过公式来表示这种关系…

带着国标充电器出国怎么办? 适配器模式(Adapter Pattern)

适配器模式&#xff08;Adapter Pattern&#xff09; 适配器模式适配器模式&#xff08;Adapter Pattern&#xff09;概述talk is cheap&#xff0c; show you my code总结 适配器模式 适配器模式&#xff08;Adapter Pattern&#xff09;是面向对象软件设计中的一种结构型设计…

SKETCHPAD——允许语言模型生成中间草图,在几何、函数、图算法和游戏策略等所有数学任务中持续提高基础模型的性能

概述 论文地址&#xff1a;https://arxiv.org/pdf/2406.09403 素描是一种应用广泛的有效工具&#xff0c;包括产生创意和解决问题。由于素描能直接传达无法用语言表达的视觉和空间信息&#xff0c;因此从古代岩画到现代建筑图纸&#xff0c;素描在世界各地被用于各种用途。儿童…

初等函数整理

1.幂函数 2.指数函数 3.对数函数

【C/C++】手搓项目中常用小工具:日志、sqlit数据库、Split切割、UUID唯一标识

每日激励&#xff1a;“不设限和自我肯定的心态&#xff1a;I can do all things。 — Stephen Curry” 绪论​&#xff1a; 本章将写到一些手搓常用工具&#xff0c;方便在项目中的使用&#xff0c;并且在手搓的过程中一些函数如&#xff1a;日志 宏中的__VA_ARGS__接收可变参…

路径规划之启发式算法之二十一:狼群算法(Wolf Pack Algorithm,WPA)

狼群算法(Wolf Pack Algorithm,WPA)是一种模拟狼群捕食行为及其猎物分配方式的群体智能优化算法。它由吴虎胜等人在2013年提出,算法采用了基于人工狼主体的自下而上的设计方法和基于职责分工的协作式搜索路径结构。它通过抽象狼群搜索、围攻以及更新换代的三种行为方式来实…

Linux下基于最新稳定版ESP-IDF5.3.2开发esp32s3入门任务创建【入门二】

继上一篇的hello world&#xff1a; 【Linux下基于最新稳定版ESP-IDF5.3.2开发esp32s3入门hello world输出【入门一】-CSDN博客】 这一篇我们开始任务的创建。 工程还是用上一篇的hello world作为模板&#xff0c;hello world就不再app_main函数中输出&#xff0c;改成在任务…

用音乐与自我对话 ——澄迈漓岛音乐节x草台回声

四季循环&#xff0c;昼夜往复&#xff0c;在相对恒定的日常中&#xff0c;音乐是扇打量世界又内观本心的双向窗户。难以描述的触动&#xff0c;透过音乐语言转换为温热且真实的吟唱&#xff0c;一次又一次记录与释放。 除却生浪主舞台中的声音玩具乐队以及STOLEN秘密行动&…