介绍golang限流库以及漏桶与令牌桶的实现原理

news2025/1/15 23:00:00

RateLimit 限流中间件

前言

为什么需要限流中间件?

在大数据量高并发访问时,经常会出现服务或接口面对大量的请求而导致数据库崩溃的情况,甚至引发连锁反映导致整个系统崩溃。或者有人恶意攻击网站,大量的无用请求出现会导致缓存穿透的情况出现。使用限流中间件可以在短时间内对请求进行限制数量,起到降级的作用,从而保障了网站的安全性。

应对大量并发请求的策略?

  1. 使用消息中间件进行统一限制(降速)
  2. 使用限流方案将多余请求返回(限流)
  3. 升级服务器
  4. 负载均衡升级
  5. 等等

可以看出在代码已经无法提升的情况下,只能去提升硬件水平。或者改动架构再加一层!也可以使用消息中间件统一处理。而结合看来,限流方案是一种既不需要大幅改动也不需要高额开销的策略。

常见的限流方案

  1. 令牌桶算法
  2. 漏桶算法
  3. 滑动窗口算法
  4. 等等

这里主要根据golang的库介绍令牌桶和漏桶的实现原理以及在实际项目中如何应用。

漏桶

引入ratelimit库

go get -u go.uber.org/ratelimit

库函数源代码

// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
	return newAtomicBased(rate, opts...)
}

// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
	// TODO consider moving config building to the implementation
	// independent code.
	config := buildConfig(opts)
	perRequest := config.per / time.Duration(rate)
	l := &atomicLimiter{
		perRequest: perRequest,
		maxSlack:   -1 * time.Duration(config.slack) * perRequest,
		clock:      config.clock,
	}

	initialState := state{
		last:     time.Time{},
		sleepFor: 0,
	}
	atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
	return l
}

该函数使用了函数选项模式多个结构体对象进行初始化

首先根据传入的值来初始化一个桶结构体 rateint传参 (time.Duration(rate)单位为纳秒 = 1/1e9秒)

初始化过程中包括了

  • 每一滴水需要的时间 perquest = config.per / time.Duration(rate)

  • maxSlack 宽松度(宽松度为负值)-1 * time.Duration(config.slack) * perRequest 松紧度是用来规范等待时间的

// Clock is the minimum necessary interface to instantiate a rate limiter with
// a clock or mock clock, compatible with clocks created using
// github.com/andres-erbsen/clock.
type Clock interface {
   Now() time.Time
   Sleep(time.Duration)
}

同时还需要结构体Clock来记录当前请求的时间now和此刻的请求所需要花费等待的时间sleep

type state struct {
   last     time.Time
   sleepFor time.Duration
}

state 主要用来记录上次执行的时间以及当前执行请求需要花费等待的时间(作为中间状态记录)

最重要的Take逻辑

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicLimiter) Take() time.Time {
   var (
      newState state
      taken    bool
      interval time.Duration
   )
   for !taken {
      now := t.clock.Now()

      previousStatePointer := atomic.LoadPointer(&t.state)
      oldState := (*state)(previousStatePointer)

      newState = state{
         last:     now,
         sleepFor: oldState.sleepFor,
      }

      // If this is our first request, then we allow it.
      if oldState.last.IsZero() {
         taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
         continue
      }
      // 计算是否需要进行等待取水操作
      newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔)
       
       // 如果等待取水时间特别小,就需要松紧度进行维护
      if newState.sleepFor < t.maxSlack {
         newState.sleepFor = t.maxSlack
      }
       // 如果等待时间大于0,就进行更新
      if newState.sleepFor > 0 {
         newState.last = newState.last.Add(newState.sleepFor)
         interval, newState.sleepFor = newState.sleepFor, 0
      }
      taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
   }
   t.clock.Sleep(interval)
   // 最后返回需要等待的时间
    return newState.last
}

实现一个Take方法

  1. 该Take方法会进行原子性操作(可以理解为加锁和解锁),在大量并发请求下仍可以保证正常使用。
  2. 记录下当前的时间 now := t.clock.Now()
  3. oldState.last.IsZero() 判断是不是第一次取水,如果是就直接将state结构体中的值进行返回。而这个结构体中初始化了上次执行时间,如果是第一次取水就作为当前时间直接传参。
  4. 如果 newState.sleepFor 非常小,就会出现问题,因此需要借助宽松度,一旦这个最小值比宽松度小,就用宽松度对取水时间进行维护。
  5. 如果newState.sleepFor > 0 就直接更新结构体中上次执行时间newState.last = newState.last.Add(newState.sleepFor)并记录需要等待的时间interval, newState.sleepFor = newState.sleepFor, 0
  6. 如果允许取水和等待操作,那就说明没有发生并发竞争的情况,就模拟睡眠时间t.clock.Sleep(interval)。然后将取水的目标时间进行返回,由服务端代码来判断是否打回响应或者等待该时间后继续响应。

t.clock.Sleep(interval)

func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }

实际上在一个请求来的时候,限流器就会进行睡眠对应的时间,并在睡眠后将最新取水时间返回。

实际应用(使用Gin框架)

func ratelimit1() func(ctx *gin.Context) {
	r1 := rate1.New(100)
	return func(ctx *gin.Context) {
		now := time.Now()
		//  Take 返回的是一个 time.Duration的时间
		if r1.Take().Sub(now) > 0 {
			// 返回的时间比当前的时间还大,说明需要进行等待
			// 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
			// 如果不需要等待请求时间,就直接进行Abort 然后返回
			response(ctx, http.StatusRequestTimeout, "rate1 limit...")
			fmt.Println("rate1 limit...")
			ctx.Abort()
			return
		}
		// 放行
		ctx.Next()
	}
}

这里你可以进行选择是否返回。因为Take一定会执行sleep函数,所以当执行take结束后表示当前请求已经接到了水。当前演示使用第一种情况。

  • 如果你的业务要求响应不允许进行等待。那么可以在该请求接完水之后然后,如上例。

  • 如果你的业务允许响应等待,那么该请求等待对应的接水时间后进行下一步。具体代码就是将if中的内容直接忽略。(建议使用)

测试代码

这里定义了一个响应函数和一个handler函数方便测试

func response(c *gin.Context, code int, info any) {
   c.JSON(code, info)
}

func pingHandler(c *gin.Context) {
   response(c, 200, "ping ok~")
}

执行go test -run=Run -v先开启一个web服务

func TestRun(t *testing.T) {
   r := gin.Default()

   r.GET("/ping1", ratelimit1(), pingHandler)
   r.GET("/ping2", ratelimit2(), helloHandler)

   _ = r.Run(":4399")
}

使用接口压力测试工具go-wrk进行测试->tsliwowicz/go-wrk: go-wrk)

golang install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest下载

使用帮助

   Usage: go-wrk <options> <url>
   Options:
    -H       Header to add to each request (you can define multiple -H flags) (Default )
    -M       HTTP method (Default GET)
    -T       Socket/request timeout in ms (Default 1000)
    -body    request body string or @filename (Default )
    -c       Number of goroutines to use (concurrent connections) (Default 10)
    -ca      CA file to verify peer against (SSL/TLS) (Default )
    -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
    -d       Duration of test in seconds (Default 10)
    -f       Playback file name (Default <empty>)
    -help    Print help (Default false)
    -host    Host Header (Default )
    -http    Use HTTP/2 (Default true)
    -key     Private key file name (SSL/TLS (Default )
    -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
    -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
    -no-vr   Skip verifying SSL certificate of the server (Default false)
    -redir   Allow Redirects (Default false)
    -v       Print version details (Default false)

-t 8个线程 -c 400个连接 -n 模拟1k次请求 -d 替换-n 表示连接时间

输入go-wrk -t=8 -c=400 -n=1000 http://127.0.0.1:4399/ping1

可以稍微等待一下水流积攒否则一个请求也不一定能够响应。
在这里插入图片描述

可以看出,89个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就应该对请求放行处理。限流器实现的比较纯粹!

令牌桶

引入ratelimit

go get -u github.com/juju/ratelimit

初始化

// NewBucket returns a new token bucket that fills at the
// rate of one token every fillInterval, up to the given
// maximum capacity. Both arguments must be
// positive. The bucket is initially full.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
   return NewBucketWithClock(fillInterval, capacity, nil)
}

// NewBucketWithClock is identical to NewBucket but injects a testable clock
// interface.
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
   return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}

进行Bucket桶的初始化。

/ NewBucketWithQuantumAndClock is like NewBucketWithQuantum, but
// also has a clock argument that allows clients to fake the passing
// of time. If clock is nil, the system clock will be used.
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
   if clock == nil {
      clock = realClock{}
   }
    // 填充速率
   if fillInterval <= 0 {
      panic("token bucket fill interval is not > 0")
   }
    // 最大令牌容量
   if capacity <= 0 {
      panic("token bucket capacity is not > 0")
   }
    // 单次令牌生成量
   if quantum <= 0 {
      panic("token bucket quantum is not > 0")
   }
   return &Bucket{
      clock:           clock,
      startTime:       clock.Now(),
      latestTick:      0,
      fillInterval:    fillInterval,
      capacity:        capacity,
      quantum:         quantum,
      availableTokens: capacity,
   }
}

令牌桶初始化过程,初始化结构体 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量

调用

// TakeAvailable takes up to count immediately available tokens from the
// bucket. It returns the number of tokens removed, or zero if there are
// no available tokens. It does not block.
func (tb *Bucket) TakeAvailable(count int64) int64 {
   tb.mu.Lock()
   defer tb.mu.Unlock()
   return tb.takeAvailable(tb.clock.Now(), count)
}

调用TakeAvailable函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。

内部实现

// takeAvailable is the internal version of TakeAvailable - it takes the
// current time as an argument to enable easy testing.
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
   // 如果需要取出的令牌数小于等于零,那么就返回0个令牌
    if count <= 0 {
      return 0
   }
    // 根据时间对当前桶中令牌数进行计算
   tb.adjustavailableTokens(tb.currentTick(now))
    // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
   if tb.availableTokens <= 0 {
      return 0
   }
    // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
   if count > tb.availableTokens {
      count = tb.availableTokens
   }
    // 调整令牌数
   tb.availableTokens -= count
   return count
}
调整令牌
// adjustavailableTokens adjusts the current number of tokens
// available in the bucket at the given time, which must
// be in the future (positive) with respect to tb.latestTick.
func (tb *Bucket) adjustavailableTokens(tick int64) {
   lastTick := tb.latestTick
   tb.latestTick = tick
    // 如果当前令牌数大于最大等于容量,直接返回最大容量
   if tb.availableTokens >= tb.capacity {
      return
   }
    // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
   tb.availableTokens += (tick - lastTick) * tb.quantum
    // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
   if tb.availableTokens > tb.capacity {
      tb.availableTokens = tb.capacity
   }
   return
}

实现原理

  1. 加锁 defer 解锁
  2. 判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0
  3. 调用函数adjustTokens 获取可用的令牌数量,该函数实现原理:
    1. 如果当前令牌数大于最大等于容量,直接返回最大容量
    2. 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
    3. 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
  4. 如果当前可以取出的令牌数小于等于0 直接返回 0
  5. 如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数
  6. 当前的令牌数 -= 取出的令牌数(count)
  7. 返回 count

额外介绍

take函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。

func (tb *Bucket) Take(count int64) time.Duration

takeMaxDuration函数,可以根据最大等待时间来进行判断。

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。

测试

func ratelimit2() func(ctx *gin.Context) {
	// 生成速率 最大容量
	r2 := rate2.NewBucket(time.Second, 200)
	return func(ctx *gin.Context) {
		//r2.Take() // 允许欠账,令牌不够也可以接收请求
		if r2.TakeAvailable(1) == 1 {
			// 如果想要取出1个令牌并且能够取出,就放行
			ctx.Next()
			return
		}
		response(ctx, http.StatusRequestTimeout, "rate2 limit...")
		ctx.Abort()
		return
	}
}

在这里插入图片描述

由于压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!

小结

令牌桶可以允许自己判断请求是否继续,不用进行睡眠。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。
个人用令牌桶还是多的,也可能是我对漏桶源码的解析有误,没有看到相关的点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/191361.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spark JDBC采用分区读取数据库时partitionColumn, lowerBound, upperBound, numPartitions参数理解

partitionColumn是应该用于确定分区的列。 lowerBound并upperBound确定要获取的值的范围。完整数据集将使用与以下查询对应的行&#xff1a; SELECT * FROM table WHERE partitionColumn BETWEEN lowerBound AND upperBound numPartitions确定要创建的分区数。lowerBound和之间…

Unicode 和 UTF-8 详解

结论 Unicode 是 字符集 UTF-8 是 编码规则 字符集&#xff1a;为每一个字符分配唯一的ID&#xff08;如 SCII 码&#xff09; 编码规则&#xff1a;将 码位转换为字节序列的规则 背景 老规矩&#xff0c;我们用图文并茂的方式来讲解&#xff1a; ASCII 这个字符集 由于仅能…

[Android Studio] Android Studio设置杂项

&#x1f7e7;&#x1f7e8;&#x1f7e9;&#x1f7e6;&#x1f7ea; Android Debug&#x1f7e7;&#x1f7e8;&#x1f7e9;&#x1f7e6;&#x1f7ea; Topic 发布安卓学习过程中遇到问题解决过程&#xff0c;希望我的解决方案可以对小伙伴们有帮助。 &#x1f4cb;笔记目…

【HBase高级】3. HBase批量装载——Bulk load(1)Bulk load简介与案例介绍

2. HBase批量装载——Bulk load 2.1 简介 很多时候&#xff0c;我们需要将外部的数据导入到HBase集群中&#xff0c;例如&#xff1a;将一些历史的数据导入到HBase做备份。我们之前已经学习了HBase的Java API&#xff0c;通过put方式可以将数据写入到HBase中&#xff0c;我们…

MyBatis(三)使用MyBatis完成CRUD(增删改查)

准备工作 1、创建module&#xff08;Maven的普通Java模块&#xff09;&#xff1a;mybatis-002-crud 2、pom.xml 打包方式jar依赖&#xff1a;mybatis依赖mysql驱动依赖junit依赖logback依赖3、mybatis-config.xml放在类的根路径下 4、CarMapper.xml放在类的根路径下 5、lo…

redis的完整学习

Redis 1.Nosql 单机mysql缓存机制分库分表水平拆分mysql集群&#xff1a;本质上是数据库的读写 MyISAM:表锁&#xff0c;效率低Innodb&#xff1a;行锁 特点 解耦&#xff01; 1.方便扩展 2.大数据量高性能 3.数据类型是多样型的&#xff08;不需要设计数据库&#xff…

c语言 预处理

int main() {//printf("%s\n", __FILE__);//打印所在文件夹位置//printf("%d\n", __LINE__);//打印当前所在行号//printf("%s\n", __DATE__);//打印当前系统日期//printf("%s\n", __TIME__);//时间//printf("%s\n", __FUNCT…

分享155个ASP源码,总有一款适合您

ASP源码 分享155个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 154个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/12oYeESSXJCd32n463LBt4w?pwd5i1n 提取码&#x…

Java线程池中的execute和submit

一、概述 execute和submit都是线程池中执行任务的方法。 execute是Executor接口中的方法 public interface Executor {void execute(Runnable command); }submit是ExecuteService接口中的方法。 public interface ExecutorService extends Executor {<T> Future<T…

vue+element模仿腾讯视频电影网站(二),增加视频播放详情页

一.前言 1. 本项目在线预览&#xff1a;点击访问 2. 作者其他博客成品汇总预览&#xff1a;点击访问 3. 接上一篇&#xff1a;《vueelement模仿腾讯视频电影网站》 暂时源码并没有提供其他获取渠道&#xff0c;私聊作者获取即可&#xff0c;或通过博客后面名片添加作者&#…

【SSM】Mybatis小技巧汇总

Mybatis技巧一&#xff1a;#{} 和 ${} 的区别使用 ${} 特例一&#xff08;排序&#xff09;使用 ${} 特例二&#xff08;表连接&#xff09;使用 ${} 特例三&#xff08;批量删除&#xff09;技巧二&#xff1a;typeAliases 别名机制别名 Alias 性质技巧三&#xff1a;mappersm…

串级PID控制原理-1

串级计算机控制系统的典型结构如图1所示&#xff0c;系统中有两个PID控制器&#xff0c;Gc2(s)称为副调节器传递函数&#xff0c;包围Gc2(s)的内环称为副回路。Gc1(s)称为主调节器传递函数&#xff0c;包围Gc1(s)的外环称为主回路。主调节器的输出控制量u1作为副回路的给定量R2…

Vuex基本概念

一、基本概念vuex&#xff1a;为了解决不关联的组件整个网站状态数据共享问题&#xff0c;专为Vue.js开发的状态管理模式。采用集中式存储管理应用的所有组件状态&#xff0c;并以相应的规则保证状态以一种可预测的方式发生变化。vuex有5个主要成员&#xff1a;state&#xff1…

DAMA数据管理知识体系指南之数据架构管理

第4章 4.1 简介 数据架构管理是定义和维护如下规范的过程&#xff1a; 提供标准的、通用的业务术语/辞典。 表达战略性的数据需求。 为满足如上需求&#xff0c;概述高层次的整合设计。 使企业战略和相关业务架构相一致。 数据架构是用于定义数据需求、指导对数据资产的整合和…

【C++】从0到1入门C++编程学习笔记 - 提高编程篇:STL常用容器(vector容器)

文章目录一、vector基本概念二、vector构造函数三、vector赋值操作四、vector容量和大小五、vector插入和删除六、vector数据存取七、vector互换容器八、vector预留空间一、vector基本概念 功能&#xff1a; vector数据结构和数组非常相似&#xff0c;也称为单端数组 vector…

Discord多账号抢白名单,如何避免账号关联被封号?

相信玩NFT项目的都不会对Discord陌生&#xff0c;现在NFT的项目都会开Discord伺服器&#xff0c;并且将内容公告在上面、在伺服器里互动&#xff0c;所以如果你想参与NFT的世界&#xff0c;学会使用Discord是一件非常重要的事情。 东哥前2天也出了关于discord如何使用、如何抢白…

很多网站、APP 前段时间一下都变灰了。 先来感受一下变灰后的效果。

很多网站、APP 前段时间一下都变灰了。 先来感受一下变灰后的效果。 这种灰色的效果怎么实现的呢&#xff1f;如何做到图片、文字、按钮都变灰的效果呢&#xff1f; 方案 1&#xff0c;换一套灰色的 UI&#xff0c;那显然成本太大了&#xff0c;用脚指头想一想就知道不太可能…

C语言---选择排序和堆排序

文章目录前言一、简单选择排序1.简介2.算法思路3.代码实现二、堆排序1.简介2.算法思路3.代码实现总结前言 堆排序是选择排序的一种&#xff0c;今天我们讲解一下堆排序和简单选择排序 一、简单选择排序 1.简介 选择排序&#xff08;Selection sort&#xff09;是一种简单直观…

ZoomCharts JavaScript 1.20.2 Crack

深入探索数据 令人惊叹的数据可视化方式 - 这里是 ZoomCharts JavaScript 图表的不同交互可能性和功能。 内容向下钻取和向上钻取 深入研究特定数据点或获得更大的图景。通过放大或缩小与图表进行物理交互&#xff0c;浏览不同的数据级别。 数据过滤 选择一个或多个数据点查看具…

【软件测试面试】他凭什么能在面试中狂揽10个offer?

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 小高&#xff1a; 记…