Redisson限流算法

news2025/2/25 2:58:46

引入依赖

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.12.3</version>
</dependency>

建议版本使用3.15.5以上

使用

这边写了一个demo示例,定义了一个叫 “LIMITER_NAME” 的限流器,设置每1秒钟生成3个令牌。

 public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);

        String key = "test:rateLimiter01";
        RRateLimiter rateLimiter = redisson.getRateLimiter(key);
        boolean trySetRate = rateLimiter.trySetRate(RateType.OVERALL,  3, 1, RateIntervalUnit.SECONDS);
        for (int i = 0; i < 30; i++) {
            boolean b = rateLimiter.tryAcquire(1,100, TimeUnit.MILLISECONDS);
            System.out.println(Thread.currentThread().getName() + " testRate第" + (i + 1) + "次:" + b);
        }

//        redisson.shutdown();
    }

在这里插入图片描述

redis的数据结构

PRateLimiter接口的实现类几乎都在RedissonRateLimiter上,我们看看前面调用PRateLimiter方法时,这些方法的对应源码实现。

接下来下面就着重讲讲限流算法中,一共用到的3个redis key。

key1 Hash结构

就是前面trySetRate设置的hash key。按照之前限流器命名“LIMITER_NAME”,这个名字就是LIMITER_NAME。一共有3个值。

1,rate:代表速率
2,interval:代表多少时间内产生的令牌
3,type:代表时单机还是集群。
在这里插入图片描述

key 2: Zset结构

zset记录获取令牌的时间戳,用于时间对比,redis key的名字是{LIMITER_NAME}:permits。下面讲讲zset中每个元素的member和score

  • member:包含两个内容,
    1)一段8位随机字符串,为了唯一标志性当次获取令牌
    2)数字,即当次获取令牌的数量。不过这些都是压缩后存储在redis中的,在工具上看时会发现乱码。
  • score:记录获取令牌的时间戳,eg:1709026371728 转成日期是2024-02-27 17:32:51
    在这里插入图片描述

key 3 string 结构

记录当前令牌桶中剩余的令牌数。redis key的名字是{LIMITER_NAME}:value。
在这里插入图片描述

算法源码分析

trySetRate尝试设置

尝试设置是:当没有对应的key的时候设置,如果已经有值了,就不做任何处理。对应实现类中的源码是:

    /**
     * Initializes RateLimiter's state and stores config to Redis server.
     * 
     * @param mode - rate mode
     * @param rate - rate
     * @param rateInterval - rate time interval
     * @param rateIntervalUnit - rate time interval unit
     * @return {@code true} if rate was set and {@code false}
     *         otherwise
     */
    boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);


 @Override
    public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
        return get(trySetRateAsync(type, rate, rateInterval, unit));
    }

    @Override
    public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
              + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
              + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
                Collections.singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());
    }

核心是lua脚本,摘出来如下:

redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);

发现基于一个hash类型的redis key设置了3个值。
不过这里的命令是hsetnx,redis hsetnx命令用于哈希表中不存在的字段赋值。
如果哈希表不存在,一个新的哈希表被创建并进行hset操作。
如果字段已经存在hash表中,操作无效。
如果key不存在,一个新的哈希表被创建并执行hsetnx命令。

这意味着,这个方法只能做配置初始化,如果后期想要修改配置参数,该方法并不会生效。我们来看看另外一个方法。

setRete重新设置

重新设置是,不管该key之前有没有用,一切清空回到初始化,重新设置。对应类中实现类的源码是。

    /**
     * Updates RateLimiter's state and stores config to Redis server.
     *
     * @param mode - rate mode
     * @param rate - rate
     * @param rateInterval - rate time interval
     * @param rateIntervalUnit - rate time interval unit
     */
    void setRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);


    public RFuture<Void> setRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
         return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "redis.call('hset', KEYS[1], 'rate', ARGV[1]);"
                        + "redis.call('hset', KEYS[1], 'interval', ARGV[2]);"
                        + "redis.call('hset', KEYS[1], 'type', ARGV[3]);"
                        + "redis.call('del', KEYS[2], KEYS[3]);",
                Arrays.asList(getName(), getValueName(), getPermitsName()), rate, unit.toMillis(rateInterval), type.ordinal());
    }

核心是lua脚本

redis.call('hset', KEYS[1], 'rate', ARGV[1]);
redis.call('hset', KEYS[1], 'interval', ARGV[2]);
redis.call('hset', KEYS[1], 'type', ARGV[3]);
redis.call('del', KEYS[2], KEYS[3]);

上述参数如下

  • KEYS[1]:hash key name
  • KEYS[2]:string(value) key name
  • KEYS[3]:zset(permits) key name
  • ARGV[1]:rate
  • ARGV[2]:interval
  • ARGV[3]:type
    通过这个lua的逻辑,就能看出直接用的是hset,会直接重置配置参数,并且同时会将已产生数据的string(value)、zset(ppermis)两个key删掉。是一个彻底的重置方法。

这里回顾一下trySetRate和setRate(注意setRate在3.12.3这个版本是没有这个方法的),在限流器不变的场景下,我们可以多次调用trySetRate,但是不能调用setRate。因为每调用一次,redis.call(‘del’,keys[2],keys[3])就会将限流器中数据清空,也就达不到限流功能。

设置过期时间

有没有发现前面针对限流器设置的3个key,都没有设置过期时间。PRateLimiter接口设计上,将设置过期时间单独拎出来了。

 // 设置过期
    boolean expire(long var1, TimeUnit var3);
    // 清除过期(永不过期)
    boolean clearExpire();

这个方法是针对3个key一起设置过期时间。

获取令牌(核心)tryAcquire

获取令牌

 private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
        byte[] random = getServiceManager().generateIdArray();

        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "local rate = redis.call('hget', KEYS[1], 'rate');"
              + "local interval = redis.call('hget', KEYS[1], 'interval');"
              + "local type = redis.call('hget', KEYS[1], 'type');"
              + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
              
              + "local valueName = KEYS[2];"
              + "local permitsName = KEYS[4];"
              + "if type == '1' then "
                  + "valueName = KEYS[3];"
                  + "permitsName = KEYS[5];"
              + "end;"

              + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "

              + "local currentValue = redis.call('get', valueName); "
              + "local res;"
              + "if currentValue ~= false then "
                     + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                     + "local released = 0; "
                     + "for i, v in ipairs(expiredValues) do "
                          + "local random, permits = struct.unpack('Bc0I', v);"
                          + "released = released + permits;"
                     + "end; "

                     + "if released > 0 then "
                          + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                          + "if tonumber(currentValue) + released > tonumber(rate) then "
                               + "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "
                          + "else "
                               + "currentValue = tonumber(currentValue) + released; "
                          + "end; "
                          + "redis.call('set', valueName, currentValue);"
                     + "end;"

                     + "if tonumber(currentValue) < tonumber(ARGV[1]) then "
                         + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
                         + "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
                     + "else "
                         + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "res = nil; "
                     + "end; "
              + "else "
                     + "redis.call('set', valueName, rate); "
                     + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
                     + "redis.call('decrby', valueName, ARGV[1]); "
                     + "res = nil; "
              + "end;"

              + "local ttl = redis.call('pttl', KEYS[1]); "
              + "if ttl > 0 then "
                  + "redis.call('pexpire', valueName, ttl); "
                  + "redis.call('pexpire', permitsName, ttl); "
              + "end; "
              + "return res;",
                Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
                value, System.currentTimeMillis(), random);
    }

我们先看看执行lua脚本时,所有要传入的参数内容:
KEYS[1]: hash key name
KEYS[2]:全局string(value) key name
KEYS[3]:单机string(value) key name
KEYS[4]:全局zset(permits) key name
KEYS[5]:单机zset(permits) key name
ARGV[1]:当前请求令牌数量
ARGV[2]:当前时间
ARGV[3]:8位随机字符串
然后我们再将其中lua部分提取出来,我再根据自己的理解

-- rate:间隔时间内产生令牌数量
-- interval:间隔时间
-- type:类型:0-全局限流;1-单机限
local rate = redis.call('hget', KEYS[1], 'rate');
local interval = redis.call('hget', KEYS[1], 'interval');
local type = redis.call('hget', KEYS[1], 'type');
-- 如果3个参数存在空值,错误提示初始化未完成
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')
local valueName = KEYS[2];
local permitsName = KEYS[4];
-- 如果是单机限流,在全局key后拼接上机器唯一标识字符
if type == '1' then
    valueName = KEYS[3];
    permitsName = KEYS[5];
end ;
-- 如果:当前请求令牌数 < 窗口时间内令牌产生数量,错误提示请求令牌不能超过rate
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate');
-- currentValue = 当前剩余令牌数量
local currentValue = redis.call('get', valueName);
-- 非第一次访问,存储剩余令牌数量的 string(value) key 存在,有值(包括 0)
if currentValue ~= false then
    -- 当前时间戳往前推一个间隔时间,属于时间窗口以外。时间窗口以外,签发过的令牌,都属于过期令牌,需要回收回来
    local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
    -- 统计可以回收的令牌数量
    local released = 0;
    for i, v in ipairs(expiredValues) do
        -- lua struct的pack/unpack方法,可以理解为文本压缩/解压缩方法
        local random, permits = struct.unpack('fI', v);
        released = released + permits;
    end ;
    -- 移除 zset(permits) 中过期的令牌签发记录
    -- 将过期令牌回收回来,重新更新剩余令牌数量
    if released > 0 then
        redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);
        currentValue = tonumber(currentValue) + released;
        redis.call('set', valueName, currentValue);
    end ;
    -- 如果 剩余令牌数量 < 当前请求令牌数量,返回推测可以获得所需令牌数量的时间
    -- (1)最近一次签发令牌的释放时间 = 最近一次签发令牌的签发时间戳 + 间隔时间(interval)
    -- (2)推测可获得所需令牌数量的时间 = 最近一次签发令牌的释放时间 - 当前时间戳
    -- (3)"推测"可获得所需令牌数量的时间,"推测",是因为不确定最近一次签发令牌数量释放后,加上到时候的剩余令牌数量,是否满足所需令牌数量
    if tonumber(currentValue) < tonumber(ARGV[1]) then
        local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1);
        return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);
        -- 如果 剩余令牌数量 >= 当前请求令牌数量,可直接记录签发令牌,并从剩余令牌数量中减去当前签发令牌数量
    else
        redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1]));
        redis.call('decrby', valueName, ARGV[1]);
        return nil;
    end ;
    -- 第一次访问,存储剩余令牌数量的 string(value) key 不存在,为 null,走初始化逻辑
else
    redis.call('set', valueName, rate);
    redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1]));
    redis.call('decrby', valueName, ARGV[1]);
    return nil;
end ;

注意事项

trySetRate是基于hsetnx,这意味着一旦设置过Hash中的限流参数,就没法修改。那么如何保证可以修改?
1,当需要修改时,执行setRate,但最好注意执行时间,因为涉及到zset,string两个key,可能会影响当前的限流窗口。
2,给限流设置过期时间expire,当到达时间后,自行删除。注意的是:
expire 要在执行tryAcquire之后执行,才能保证3个key都成功设置过期时间。否则可能只有hash的key才有设置过期时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1477767.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kswapd0挖矿病毒攻击记录

文章目录 一、起因与病毒分析1、起因2、阿里云告警2.1 恶意脚本代码执行12.2 恶意脚本代码执行22.3恶意脚本代码执行32.4 恶意脚本代码执行4 3、病毒简单分析3.1 病毒的初始化3.2 病毒本体执行 4、总结 二、ubuntu自救指南1、病毒清理2、如何防御 一、起因与病毒分析 1、起因 …

跳跃游戏Ⅱ

问题 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - …

【深蓝学院】移动机器人运动规划--第7章 集群机器人运动规划--笔记

文章目录 0. Contents1. Multi-Agent Path Finding (MAPF)1.1 HCA*1.2 Single-Agent A*1.3 ID1.4 M*1.5 Conflict-Based Search(CBS)1.6 ECBS1.6.1 heuristics1.6.2 Focal Search 2. Velocity Obstacle (VO&#xff0c;速度障碍物)2.1 VO2.2. RVO2.3 ORCA 3. Flocking model&am…

EAS web 界面加载后,隐藏按钮

效果&#xff1a;隐藏下列按钮&#xff1a; 实现方法&#xff1a; 1、创建数据装载事件&#xff1a; 2、隐藏按钮&#xff1a; afterOnloadHideEntryTBBBBBB:function(e){console.log("----------失败222&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&a…

Redis主从、哨兵、Redis Cluster集群架构

Redis主从、哨兵、Redis Cluster集群架构 Redis主从架构 Redis主从架构搭建 主从搭建的问题 如果同步数据失败&#xff0c;查看log日志报错无法连接&#xff0c;检查是否端口未开放出现”Error reply to PING from master:...“日志&#xff0c;修改参数protected-mode no …

WebDAV之π-Disk派盘 + DAVx⁵

DAVx⁵是一款通过标准 CardDAV 和 CalDAV 协议同步通讯录、日历的 Android 应用,支持 iCloud 等云服务,只需要在 Android 端安装,即可实现在 iPhone 与 Android 间双向同步通讯录、日历、提醒事项等数据。 资源自动检测,支持自签名证书,通过客户端证书进行身份验证;可以…

如何使用Windows系统电脑无公网ip远程桌面Ubuntu系统

文章目录 前言1. ubuntu安装VNC2. 设置vnc开机启动3. windows 安装VNC viewer连接工具4. 内网穿透4.1 安装cpolar【支持使用一键脚本命令安装】4.2 创建隧道映射4.3 测试公网远程访问 5. 配置固定TCP地址5.1 保留一个固定的公网TCP端口地址5.2 配置固定公网TCP端口地址5.3 测试…

SpringBoot整合rabbitmq-直连交换机队列(二)

说明&#xff1a;本文章主要是Direct定向/直连类型交换机的使用&#xff0c;它的大致流程是将一个队列绑定到一个直连交换机上&#xff0c;并赋予一个路由键 routingkey&#xff0c;当一个消息携带着路由值为routingkey&#xff0c;这个消息通过生产者发送给交换机时&#xff0…

Unity UI适配规则和对热门游戏适配策略的拆解

前言 本文会介绍一些关于UI适配的基础概念&#xff0c;并且统计了市面上常见的设备的分辨率的情况。同时通过拆解目前市面上较为成功的两款休闲游戏Royal Match和Monopoly GO(两款均为近期游戏付费榜前几的游戏)&#xff0c;大致推断出他们的适配策略&#xff0c;以供学习和参…

CP AutoSar之LIN Driver详细说明

本文遵循autosar标准&#xff1a;R22-11 1 简介 本文指定了 AUTOSAR 基础软件模块 LIN 驱动程序的功能、API 和配置。 1.1 范围 LIN驱动程序适用于ISO 17987主节点和从节点。AUTOSAR中的LIN实现偏离了本LIN驱动器规范中所述的ISO 17987规范&#xff0c;但LIN总线上的行为不…

Allure报告归纳与总结

一、环境准备&#xff1a; 提前准备环境:Java1.8 Allure2 解析过程: 1.安装 allure2(信赖Java1.8) allure官方下载地址&#xff1a; https://repo.maven.apache.org/maven2/io/qameta/allure/allure-commandline/ 2.安装 allure-pytest 命令:pip install allure-pytest 3.生成…

Mamba 作者谈 LLM 未来架构

文章目录 前言 1、为什么注意力机制有效&#xff1f; 2、注意力计算量呈平方级增长 3、Striped Hyena 是个什么模型&#xff1f; 4、什么是 Mamba? 5、Mamba 硬件优化 6、2024年架构预测 7、对 AI 更多的预测 本片文章来自【机器之心】对Mamba作者进行采访所进行的编译整理。 …

TCP缓存

TCP缓存是指TCP协议在数据传输过程中使用的一种机制&#xff0c;用于临时存储和管理数据包。它主要有三个作用&#xff1a;提高网络性能、保证数据的可靠性和实现流量控制。 首先&#xff0c;TCP缓存可以提高网络性能。当发送端发送数据时&#xff0c;TCP协议会将数据分割成若…

自动驾驶加速落地,激光雷达放量可期(上)

1 激光雷达应用广泛&#xff0c;汽车有望成最大催化 激光雷达&#xff08;LiDAR&#xff09;是一种主动遥感技术&#xff0c;通过测定传感器发出的激光在传感器与目标物体之间的传播距离&#xff0c;来分析目标地物表面的反射能量大小、反射波谱的幅度、频率和相位等信息&#…

ensp模拟单臂路由实现不同两个网段主机访问

拓扑结构图如下 1.pc机配置略过 2.交换机配置 三个接口&#xff0c;两个连接pc&#xff0c;连接方式access&#xff0c;一个连接路由器 连接方式trunk sy #进入系统 视图模式 undo info-center enable #关闭信息 vlan batch 10 20#批量创建vlan int g 0/0/2#进入2端口 p…

黑马嵌入式开发数电模电基础课

本课程旨在为学习者提供扎实的嵌入式系统开发所需的数字电路和模拟电路知识。通过理论与实践相结合的方式&#xff0c;学习者将掌握数字信号处理、模拟信号调节等关键概念&#xff0c;为将来在嵌入式系统设计与开发领域取得成功打下坚实基础。 视频大小&#xff1a;4.9G 课程…

【Vue3】回顾watch,学习watchEffect

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

生产报工异常信息提示器如何精确提醒管理人员

在现代生产环境中&#xff0c;生产报工异常信息的及时提醒对于管理人员来说至关重要。为了精确提醒管理人员并确保生产流程的顺利进行&#xff0c;智能信息接收腕表作为一种先进的工具&#xff0c;结合了多项功能&#xff0c;可以有效地实现生产报工异常信息的精确提醒。以下将…