封装一个细粒度的限流器

news2025/1/11 20:46:47

文章目录

  • 原因
  • 限流对象
  • 限流后的做法
  • 怎么确定限流阈值
    • 观测业务性能数据
    • 压测
    • 借鉴链路上的其他服务
    • 手动计算
  • 四种静态限流算法
    • 令牌桶
    • 漏桶
    • 固定窗口与滑动窗口
  • 手写限流算法
    • 令牌桶
    • 漏桶
    • 固定窗口
    • 滑动窗口
  • 分布式限流的具体实现

原因

尽管云原生网关里有统一入口的限流(根据ip、userID来控制),但是有的微服务需要有自己的限流策略(比如根据不同的算法任务、不同的子产品来限制),所以封装了一个限流器公共包,可以在多个微服务中复用这个功能。直接原因是有一次某个子功能流量激增,大量任务失败。

关键步骤:

  • 实现限流策略,例如基于令牌桶或漏桶
  • 配置和初始化,微服务启动时加载配置和初始化限流器

限流对象

针对ip限流,例如1s限制50个请求,考虑到公共ip的情况;
针对某个算法任务,不限制vip用户,普通用户1s限制创建10个创建任务的请求。

限流后的做法

目前的做法是限流了就直接拒绝,抛出错误提示,还有其他的做法:

  • 同步阻塞等待一段时间。如果是偶发性地触发了限流,那么稍微阻塞等待一会儿,后面就有极大的概率能得到处理。比如说限流设置为一秒钟 100 个请求,恰好来了 101 个请求。多出来的一个请求只需要等一秒钟,下一秒钟就会被处理。但是要注意控制住超时,也就是说你不能让人无限期地等待下去。

  • 在这里插入图片描述

  • 同步转异步。这里我们又一次看到了这个手段,它是指如果一个请求没被限流,那就直接同步处理;而如果被限流了,那么这个请求就会被存储起来,等到业务低峰期的时候再处理。这个其实跟降级差不多。(TODO 研究到降级时再过来看一下)

  • -调整负载均衡算法(用redis的话似乎跟负载均衡没关系,如果是在网关里做限流是可以调整负载均衡器的)。如果某个请求被限流了,那么就相当于告诉负载均衡器,应该尽可能少给这个节点发送请求。我在熔断里面给你讲过类似的方案。不过在熔断里面是负载均衡器后续不再发请求,而在限流这里还是会发送请求,只是会降低转发请求到该节点的概率。调整节点的权重就能达成这种效果。

  • 在这里插入图片描述

怎么确定限流阈值

观测业务性能数据

我们公司有完善的监控,所以我可以通过观测到的性能数据来确定阈值。比如说观察线上的数据,如果在业务高峰期整个集群的 QPS 都没超过 1000,那么就可以考虑将阈值设定在 1200,多出来的 200 就是余量。
不过这种方式有一个要求,就是服务必须先上线,有了线上的观测数据才能确定阈值。并且,整个阈值很有可能是偏低的。因为业务巅峰并不意味着是集群性能的瓶颈。如果集群本身可以承受每秒 3000 个请求,但是因为业务量不够,每秒只有 1000 个请求,那么我这里预估出来的阈值是显著低于集群真实瓶颈 QPS 的。

压测

不过我个人觉得,最好的方式应该是在线上执行全链路压测,测试出瓶颈。即便不能做全链路压测,也可以考虑模拟线上环境进行压测,再差也应该在测试环境做一个压力测试。

借鉴链路上的其他服务

不过如果真的做不了,或者来不及,或者没资源,那么还可以考虑参考类似服务的阈值。比如说如果 A、B 服务是紧密相关的,也就是通常调用了 A 服务就会调用 B 服务,那么可以用 A 已经确定的阈值作为 B 的阈值。又或者 A 服务到 B 服务之间有一个转化关系。比如说创建订单到支付,会有一个转化率,假如说是 90%,如果创建订单的接口阈值是 100,那么支付的接口就可以设置为 90。

手动计算

实在没办法了,就只能手动计算了。也就是沿着整条调用链路统计出现了多少次数据库查询、多少次微服务调用、多少次第三方中间件访问,如 Redis,Kafka 等。举一个最简单的例子,假如说一个非常简单的服务,整个链路只有一次数据库查询,这是一个会回表的数据库查询,根据公司的平均数据这一次查询会耗时 10ms,那么再增加 10 ms 作为 CPU 计算耗时。也就是说这一个接口预期的响应时间是 20ms。如果一个实例是 4 核,那么就可以简单用 1000ms÷20ms×4=200 得到阈值。

四种静态限流算法

令牌桶

系统会以一个恒定的速率产生令牌,这些令牌会放到一个桶里面,每个请求只有拿到了令牌才会被执行。每当一个请求过来的时候,就需要尝试从桶里面拿一个令牌。如果拿到了令牌,那么请求就会被处理;如果没有拿到,那么这个请求就被限流了。(当令牌桶已满时,新生成的令牌会被丢弃,不会增加桶中的令牌数量。)
你需要注意,本身令牌桶是可以积攒一定数量的令牌的。比如说桶的容量是 100,也就是这里面最多积攒 100 个令牌。那么当某一时刻突然来了 100 个请求,它们都能拿到令牌。
在这里插入图片描述

漏桶

漏桶是指当请求以不均匀的速度到达服务器之后,限流器会以固定的速率转交给业务逻辑。
某种程度上,你可以将漏桶算法看作是令牌桶算法的一种特殊形态。你将令牌桶中桶的容量设想为 0,就是漏桶了。
在这里插入图片描述
在这里插入图片描述
所以你可以看到,在漏桶里面,令牌产生之后你就需要取走,没取走的话也不会积攒下来。因此漏桶是绝对均匀的,而令牌桶不是绝对均匀的。

固定窗口与滑动窗口

固定窗口是指在一个固定时间段,只允许执行固定数量的请求。比如说在一秒钟之内只能执行 100 个请求。滑动窗口类似于固定窗口,也是指在一个固定时间段内,只允许执行固定数量的请求。区别就在于,滑动窗口是平滑地挪动窗口,而不像固定窗口那样突然地挪动窗口。假设窗口大小是一分钟。此时时间是 t1,那么窗口的起始位置是 t1-1 分钟。过了 2 秒之后,窗口大小依旧是 1 分钟,但是窗口的起始位置也向后挪动了 2 秒,变成了 t1 - 1 分钟 + 2 秒。这也就是滑动的含义。
在这里插入图片描述

手写限流算法

参考:
https://blog.csdn.net/z3551906947/article/details/140477024,并且里面阐述了各个算法的优缺点,漏桶是可以用来处理突发流量的。

令牌桶

package limiter

import (
    "sync"
    "time"
)

// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
    capacity      int        // 容量
    currentTokens int        // 令牌数量
    rate          int        // 发放令牌速率/秒
    lastTime      time.Time  // 上次发放令牌时间
    mutex         sync.Mutex // 避免并发问题
}

// NewTokenBucketLimiter 创建一个新的令牌桶限流器实例。
func NewTokenBucketLimiter(capacity, rate int) *TokenBucketLimiter {
    return &TokenBucketLimiter{
       capacity:      capacity,
       rate:          rate,
       lastTime:      time.Now(),
       currentTokens: 0, // 初始化时桶中没有令牌
    }
}

// TryAcquire 尝试从令牌桶中获取一个令牌。
func (l *TokenBucketLimiter) TryAcquire() bool {
    l.mutex.Lock()
    defer l.mutex.Unlock()

    now := time.Now()
    interval := now.Sub(l.lastTime) // 计算时间间隔

    // 如果距离上次发放令牌超过 1/rate 秒,则发放新的令牌
    if float64(interval) >= float64(time.Second)/float64(l.rate) {
       // 计算应该发放的令牌数量,但不超过桶的容量
       newTokens := int(float64(interval)/float64(time.Second)* l.rate) 
       l.currentTokens = minInt(l.capacity, l.currentTokens+newTokens)

       // 更新上次发放令牌的时间
       l.lastTime = now
    }

    // 如果桶中没有令牌,则请求失败
    if l.currentTokens == 0 {
       return false
    }

    // 桶中有令牌,消费一个令牌
    l.currentTokens--

    return true
}

// minInt 返回两个整数中的较小值。
func minInt(a, b int) int {
    if a < b {
       return a
    }
    return b
}

func TestName(t *testing.T) {
    tokenBucket := NewTokenBucketLimiter(5, 10)
    for i := 0; i < 10; i++ {
        fmt.Println(tokenBucket.TryAcquire())
    }
    time.Sleep(100 * time.Millisecond)
    fmt.Println(tokenBucket.TryAcquire())
}

漏桶

package limiter

import (
	"fmt"
	"math"
	"sync"
	"testing"
	"time"
)

// LeakyBucketLimiter 漏桶限流器
type LeakyBucketLimiter struct {
	capacity     int        // 桶容量
	currentLevel int        // 当前水位
	rate         int        // 水流速度/秒
	lastTime     time.Time  // 上次放水时间
	mutex        sync.Mutex // 避免并发问题
}

// NewLeakyBucketLimiter 初始化漏桶限流器
func NewLeakyBucketLimiter(capacity, rate int) *LeakyBucketLimiter {
	return &LeakyBucketLimiter{
		capacity:     capacity,
		currentLevel: 0, // 初始化时水位为0
		rate:         rate,
		lastTime:     time.Now(),
	}
}

// TryAcquire 尝试获取处理请求的权限
func (l *LeakyBucketLimiter) TryAcquire() bool {
	l.mutex.Lock() // 直接获取写锁
	defer l.mutex.Unlock()

	// 如果上次放水时间距今不到 1/rate 秒,不需要放水
	now := time.Now()
	interval := now.Sub(l.lastTime)

	// 计算放水后的水位
	if float64(interval) >= float64(time.Second)/float64(l.rate) {
		l.currentLevel = int(math.Max(0, float64(l.currentLevel)-float64(interval)/float64(time.Second)*float64(l.rate)))
		l.lastTime = now
	}
	// 尝试增加水位
	if l.currentLevel < l.capacity {
		l.currentLevel++
		return true
	}
	return false
}

func TestName(t *testing.T) {
	tokenBucket := NewLeakyBucketLimiter(5, 10)
	for i := 0; i < 10; i++ {
		fmt.Println(tokenBucket.TryAcquire())
	}
	time.Sleep(100 * time.Millisecond)
	fmt.Println(tokenBucket.TryAcquire())
}

固定窗口

package main

import (
	"sync"
	"time"
)

// FixedWindowRateLimiter 定义固定窗口限流器
type FixedWindowRateLimiter struct {
	mu           sync.Mutex
	maxRequests  int
	requestCount int
	window       time.Time // 窗口的起始点,每个窗口长度1s
}

// NewFixedWindowRateLimiter 创建一个新的固定窗口限流器实例
func NewFixedWindowRateLimiter(maxRequests int) *FixedWindowRateLimiter {
	return &FixedWindowRateLimiter{
		maxRequests: maxRequests,
		window:      time.Now().Truncate(time.Second),
	}
}

// TryAcquire 尝试获取请求许可
func (f *FixedWindowRateLimiter) TryAcquire() bool {
	f.mu.Lock()
	defer f.mu.Unlock()

	// 检查是否需要重置窗口
	if time.Now().After(f.window.Add(time.Second)) {
		f.requestCount = 0
		f.window = time.Now().Truncate(time.Second)
	}

	// 检查是否达到最大请求次数
	if f.requestCount >= f.maxRequests {
		return false
	}

	// 请求成功,递增计数器
	f.requestCount++
	return true
}

func main() {
	limiter := NewFixedWindowRateLimiter(5)
	for i := 0; i < 10; i++ {
		if limiter.TryAcquire() {
			fmt.Println("请求通过")
		} else {
			fmt.Println("请求被拒绝")
		}
		time.Sleep(100 * time.Millisecond)
	}
}

滑动窗口

package main

import (
	"sync"
	"time"
)

// SlidingWindowRateLimiter 定义滑动窗口限流器
type SlidingWindowRateLimiter struct {
	mu           sync.Mutex
	maxRequests  int
	windowSize  time.Duration // 窗口长度
	windows     []int
	windowIndex int
	currentTime time.Time //   上个滑窗的起始点
}

// NewSlidingWindowRateLimiter 创建一个新的滑动窗口限流器实例
func NewSlidingWindowRateLimiter(maxRequests int, windowSize time.Duration) *SlidingWindowRateLimiter {
	numWindows := int(windowSize.Seconds())
	return &SlidingWindowRateLimiter{
		maxRequests:  maxRequests,
		windowSize:   windowSize,
		windows:      make([]int, numWindows),
		currentTime:  time.Now().Truncate(time.Second),
		windowIndex:  0,
	}
}

// TryAcquire 尝试获取请求许可
func (s *SlidingWindowRateLimiter) TryAcquire() bool {
	s.mu.Lock()
	defer s.mu.Unlock()

	// 更新当前时间
	currentTime := time.Now().Truncate(time.Second)

	// 检查是否需要更新窗口
	if currentTime.After(s.currentTime.Add(s.windowSize)) {
		s.currentTime = currentTime
		s.windowIndex = 0
	} else if currentTime.After(s.currentTime.Add(time.Second)) {
		s.windowIndex = (s.windowIndex + 1) % len(s.windows)
	}

	// 清除过期窗口
	for i := range s.windows {
		if currentTime.Before(s.currentTime.Add(time.Duration(i+1)*time.Second)) {
			break
		}
		s.windows[i] = 0
	}

	// 检查是否达到最大请求次数
	totalRequests := 0
	for _, count := range s.windows {
		totalRequests += count
	}
	if totalRequests >= s.maxRequests {
		return false
	}

	// 请求成功,递增计数器
	s.windows[s.windowIndex]++
	return true
}

func main() {
	limiter := NewSlidingWindowRateLimiter(5, 10*time.Second)
	for i := 0; i < 10; i++ {
		if limiter.TryAcquire() {
			fmt.Println("请求通过")
		} else {
			fmt.Println("请求被拒绝")
		}
		time.Sleep(100 * time.Millisecond)
	}
}

分布式限流的具体实现

从单机或者集群的角度看,可以分为单机限流或者集群限流。集群限流一般需要借助 Redis 之类的中间件来记录流量和阈值。换句话说,就是你需要用 Redis 等工具来实现前面提到的限流算法。当然如果是利用网关来实现集群限流,那么可以摆脱 Redis。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2040320.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

股指期货套期保值中的展期管理有哪些?

在复杂的金融市场环境中&#xff0c;展期作为一种重要的风险管理工具&#xff0c;被广泛应用于期货交易中&#xff0c;特别是当投资者需要对长期资产进行套期保值时。展期的核心思想在于&#xff0c;通过连续替换高流动性的近月期货合约来替代流动性较差的远月合约&#xff0c;…

untiy有渲染线程和逻辑线程嘛

之前我也这么认为&#xff0c;其实unity引擎是单线程的&#xff0c;当然后续的jobs不在考虑范围内 如果你在一个awake 或者 start方法中 延时&#xff0c;是会卡住主线程的 比如 其实游戏引擎有一个基础简单理解&#xff0c;那就是不断的进行一个循环&#xff0c;在这个周期循…

网络分段如何增强 OT 网络的可见性

防火墙在保护运营技术(OT) 网络和系统方面发挥什么作用&#xff1f; 很多人会说&#xff0c;防火墙是一种防御机制&#xff0c;用于保护该环境免受 IT 和外界的影响。对于负责该关键系统正常运行的操作员来说&#xff0c;防火墙是阻止他人进入的外围保护。它也是需要从 OT 系统…

Unity Render Streaming项目实践经验

UnityRenderStreaming项目 项目github地址见上,我使用项目的3.1.0-exp.7版本、Unity 2023.1.0版本、windows11运行。 1下载项目包 2在Unity Hub中打开RenderStreaming~文件夹 3在package manager中导入com.unity.renderstreaming package 因为已经下载过了就选择install pa…

Linux shell编程学习笔记71: sort 命令——构造有序世界

0 前言 在大数据时代&#xff0c;我们面对和使用大量数据&#xff0c;如果数据是有序的&#xff0c;无疑是有益的。 在Linux中&#xff0c;我们可以使用 sort 命令来构造一个有序的空间。 1 sort命令 的功能、格式和选项说明 我们可以使用命令sort --help来获取帮助信息。 …

Spark MLlib 特征工程(下)

Spark MLlib 特征工程(下) 前面我们提到&#xff0c;典型的特征工程包含如下几个环节&#xff0c;即预处理、特征选择、归一化、离散化、Embedding 和向量计算&#xff0c;如下图所示。 在上一讲&#xff0c;我们着重讲解了其中的前 3 个环节&#xff0c;也就是预处理、特征选…

Memecoin的火爆与AMM在Solana上的主导地位

随着Solana区块链的高速发展&#xff0c;尤其是近年来Memecoin市场的崛起&#xff0c;AMM&#xff08;自动做市商&#xff09;逐渐成为Solana去中心化交易所&#xff08;DEX&#xff09;的主导交易模式。尽管Solana以其高效性能能够支持每秒数千笔交易&#xff0c;足以让中心化…

【C语言篇】深入理解指针3(附转移表源码)

文章目录 数组指针什么是数组指针数组指针变量的初始化 二维数组传参的本质函数指针函数指针变量的创建函数指针变量的使用 两端有趣的代码typedef 关键字 函数指针数组转移表写在最后 数组指针 什么是数组指针 在【C语言篇】深入理解指针2我们学习了指针数组&#xff0c;指针…

Qt项目【上位机十字屏开发】

效果图 说明 重写 QWidget 中的 paintEvent() 处理绘图事件&#xff0c;废话不多说&#xff0c;上代码 源码 #ifndef MYWIDGETFORM_H #define MYWIDGETFORM_H#include <QWidget>namespace Ui { class myWidgetForm; }enum MYTYPE{SIZEWIDTH,SIZEHEIGHT,TOPWIDTH,TOPHE…

XMind在软件需求分析中编写测试用例的应用技巧

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言 在软件需求分析中&#xff0c;编写测试用例是确保软件质量的重要环节。之前很多同学都是用Excel&#xff0c;但是XMind作为一款功能强大的思维导图工具&#xff0c;可以在需求分析阶段帮助测试人员系统地设计和组织测试用…

报错解决——苹果电脑mac装windows10,总是提示“启动转换”安装失败:拷贝Windows安装文件时出错

报错原因&#xff1a; 所安装的镜像文件大于4GB。 解决办法一&#xff1a; 使用小于4GB的镜像文件。 参考文章&#xff1a; 安装小于4GB的windows系统镜像 小于4GB的windows10镜像下载&#xff1a; 系统库官网 解决办法二&#xff1a; 参考文章&#xff1a; Mac air装…

如何利用Maven命令使得本地 .jar 文件安装到本地仓库中,以供其他项目通过 Maven 依赖引用

文件夹打包 例如此时我的文件夹example当中有两个class文件 复制文件夹路径 cmd运行命令&#xff1a;jar cvf nation.jar -C 你的文件夹路径 . 以我的举例&#xff1a; 这样就完成了打包 导入仓库 先找到jar文件的位置&#xff0c;复制路径 并且确定自己有安装好maven命…

【概率统计】三扇门游戏(蒙提霍尔问题)

三扇门游戏 两种答案2/3的重选正确率1/2的重选正确率 正确答案 也称为蒙提霍尔问题&#xff08;Monty Hall problem&#xff09;&#xff1a; 有三扇门&#xff0c;其中只有一扇是正确的门&#xff0c;打开后将能获得一辆豪车。另外两扇门是错误选项&#xff0c;门内只有山羊。…

模板——从初级到进阶

目录 前言&#xff1a; 一、非类型模板参数 二、模板的特化 2.1 函数模板特化 2.2 类模板特化 2.2.1 全特化 2.2.2 偏特化 三、模板分离编译 3.1 什么是分离编译 3.2 模板的分离编译 四、模板总结 前言&#xff1a; 我们前面已经对初阶模板有了比较深刻的了解&#xff…

鸿蒙前端开发——工具安装与项目创建

工具安装 DevEco Studio https://developer.huawei.com/consumer/cn/ 直接下一步。 创建空项目 双击进入 空项目如下&#xff1a; 点击previewer进行预览 备用地址下载

十、OpenCVSharp 中的图像的几何变换

文章目录 简介一、平移1. 平移向量的定义和计算2. 平移操作的矩阵表示二、旋转1. 旋转角度的表示和计算2. 旋转中心的选择3. 旋转矩阵的推导和应用三、缩放1. 缩放因子的确定2. 缩放操作的数学模型3. 缩放过程中的图像插值方法(如最近邻插值、双线性插值、双三次插值)四、仿射…

Qt连接Postgres数据库

数据库相关代码可以看我这篇文章&#xff0c;今天要说的是驱动问题&#xff0c;网上很多说将Postgres/bin目录下的某些.dll文件拷贝到运行目录&#xff0c;实际测试的时候发现&#xff0c;还是加载不了驱动。 后来发现postgres可以直接下载相关的驱动依赖&#xff0c;将流程分…

计算机三级嵌入式笔记(五)——嵌入式系统的开发

目录 考点1 嵌入式系统的开发过程 考点2 嵌入式系统的开发平台与工具 考点3 嵌入式系统的调试 考点4 ADS1.2 工具软件 考点5 RVDS 考点6 GNU 考点7 基于嵌入式 Web 服务器的应用设计 23考纲 考点1 嵌入式系统的开发过程 (1)嵌入式系统的开发过程可以划分为系统需求分析与…

Golang | Leetcode Golang题解之第334题递增的三元子序列

题目&#xff1a; 题解&#xff1a; func increasingTriplet(nums []int) bool {n : len(nums)if n < 3 {return false}first, second : nums[0], math.MaxInt32for i : 1; i < n; i {num : nums[i]if num > second {return true} else if num > first {second n…

Ajax-01.原生方式

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Ajax-原生方式</title> </head> <!-…