【源码解析】Redisson分布式限流器RRateLimiter源码分析

news2024/11/24 6:05:01

前面已经写了一篇Redisson的分布式限流的使用,Redisson分布式限流的简单实践,对其中的原理很好奇。

一、使用

// 1、 声明一个限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);

// 2、 设置速率,5秒中产生3个令牌
rateLimiter.trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);

// 3、试图获取一个令牌,获取到返回true
rateLimiter.tryAcquire(1)

二、原理

  1. getRateLimiter
// 声明一个限流器 名称 叫key
redissonClient.getRateLimiter(key)
  1. RedissonRateLimiter#trySetRate。5秒中产生3个令牌。rateInterval指的是时间间隔,rate指的是指定时间间隔产生的令牌数。
@Override
public boolean trySetRate(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
    return get(trySetRateAsync(type, rate, rateInterval, unit));
}

@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
          + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
          + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
            Collections.singletonList(getRawName()), rate, unit.toMillis(rateInterval), type.ordinal());
}
  1. RedissonRateLimiter#tryAcquire()
    • key的数组是 Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName())
    • 如果name是rate.limiter,那么lua脚本中的valueName{rate.limiter}:valuepermitsName{rate.limiter}:permitskey[3]的结果是getClientValueName(){rate.limiter}:value:4802866e-25b3-4482-aa74-61aa947d6f7a,需要拼接机器的唯一id。key[5]的结果是{rate.limiter}:permits:4802866e-25b3-4482-aa74-61aa947d6f7a
    • tonumber(rate) >= tonumber(ARGV[1]),表明rate要比请求的令牌数大。
    • 如果首次获取,设置valueName为rate,设置permitsName的score为当前时间戳,设置值为随机数和获取的令牌数,更新valueName,减去需要获取的令牌数。
    • 第二次获取令牌执行,获取0-(当前时间-生成令牌间隔interval)时间内的数据。获取之前所有的请求数released,如果released>0,更新valueName为当前值+释放令牌数。之前的请求令牌数 > 0, 例如10s产生3个令牌,现在超过10s了,重置周期并计算剩余令牌数。
    • 如果当前可提供的令牌数小于获取的令牌数,获取最近一次的记录。返回当前key的剩余过期时间。上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) ,这个值表示还需要多久才能生产出足够的令牌。
    • 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,直接更新zset。
    @Override
    public boolean tryAcquire(long permits) {
        return get(tryAcquireAsync(RedisCommands.EVAL_NULL_BOOLEAN, permits));
    }

    private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "local rate = redis.call('hget', KEYS[1], 'rate');"
              + "local interval = redis.call('hget', KEYS[1], 'interval');"
              + "local type = redis.call('hget', KEYS[1], 'type');"
              + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
              
              + "local valueName = KEYS[2];"
              + "local permitsName = KEYS[4];"
              + "if type == '1' then "
                  + "valueName = KEYS[3];"
                  + "permitsName = KEYS[5];"
              + "end;"

              + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "

              + "local currentValue = redis.call('get', valueName); "
              + "if currentValue ~= false then "
                     + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                     + "local released = 0; "
                     + "for i, v in ipairs(expiredValues) do "
                          + "local random, permits = struct.unpack('fI', v);"
                          + "released = released + permits;"
                     + "end; "

                     + "if released > 0 then "
                          + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                          + "currentValue = tonumber(currentValue) + released; "
                          + "redis.call('set', valueName, currentValue);"
                     + "end;"

                     + "if tonumber(currentValue) < tonumber(ARGV[1]) then "
                         + "local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1); "
                         + "return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);"
                     + "else "
                         + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "return nil; "
                     + "end; "
              + "else "
                     + "redis.call('set', valueName, rate); "
                     + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
                     + "redis.call('decrby', valueName, ARGV[1]); "
                     + "return nil; "
              + "end;",
                Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
                value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong());
    }

RedissonRateLimiter#getValueName,生成valueName

    String getValueName() {
        return suffixName(getRawName(), "value");
    }  

    String getPermitsName() {
        return suffixName(getRawName(), "permits");
    }

	public static String suffixName(String name, String suffix) {
        if (name.contains("{")) {
            return name + ":" + suffix;
        }
        return "{" + name + "}:" + suffix;
    }

RedissonRateLimiter#tryAcquire(long, long, java.util.concurrent.TimeUnit),带时限的获取令牌。delay就是lua脚本返回的,还需要多久才会有令牌。如果获取令牌的时间比设置的超时时间还要大的话,直接就false了,否则会再次尝试获取令牌。

	@Override
	public boolean tryAcquire(long permits, long timeout, TimeUnit unit) {
    	return get(tryAcquireAsync(permits, timeout, unit));
	}

    @Override
    public RFuture<Boolean> tryAcquireAsync(long permits, long timeout, TimeUnit unit) {
        RPromise<Boolean> promise = new RedissonPromise<Boolean>();
        long timeoutInMillis = -1;
        if (timeout >= 0) {
            timeoutInMillis = unit.toMillis(timeout);
        }
        tryAcquireAsync(permits, promise, timeoutInMillis);
        return promise;
    }
    
    private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
        long s = System.currentTimeMillis();
        RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
        future.onComplete((delay, e) -> {
            if (e != null) {
                promise.tryFailure(e);
                return;
            }
            
            if (delay == null) {
                promise.trySuccess(true);
                return;
            }
            
            if (timeoutInMillis == -1) {
                commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                    tryAcquireAsync(permits, promise, timeoutInMillis);
                }, delay, TimeUnit.MILLISECONDS);
                return;
            }
            
            long el = System.currentTimeMillis() - s;
            long remains = timeoutInMillis - el;
            if (remains <= 0) {
                promise.trySuccess(false);
                return;
            }
            if (remains < delay) {
                commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                    promise.trySuccess(false);
                }, remains, TimeUnit.MILLISECONDS);
            } else {
                long start = System.currentTimeMillis();
                commandExecutor.getConnectionManager().getGroup().schedule(() -> {
                    long elapsed = System.currentTimeMillis() - start;
                    if (remains <= elapsed) {
                        promise.trySuccess(false);
                        return;
                    }
                    
                    tryAcquireAsync(permits, promise, remains - elapsed);
                }, delay, TimeUnit.MILLISECONDS);
            }
        });
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/508502.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何把软件从C盘移到D盘?

​为什么要把软件从C盘移到D盘&#xff1f; C盘是安装操作系统的系统分区。虽然很多用户在安装系统的时候会给C盘分配了大量的磁盘空间&#xff0c;但是大多数用户会发现C盘很快就会无缘无故的被占满。这是为什么呢&#xff1f;这主要是由于大多数三方程序默认安装在C盘造成…

安科瑞5G智慧水务能效管理平台在九江环境治理中的应用

摘要&#xff1a;当下&#xff0c;以数字孪生为主的数字技术愈发成熟&#xff0c;为使得长江水环境治理能够“长治久安”&#xff0c;上海院在长江大保护先行先试城市九江城中水环境治理中启用了智慧水务先进理念&#xff0c;搭建了基于数字孪生技术的智慧水务平台。通过数字孪…

Vue 页面列表中部门类型根据层级缩进显示 ASCII 160

如上图, 部门类型都是一长溜, 没有根据级别进行缩进. 修改的时候尝试了在类型字段前面加了空格(ASCII-32), tab制表符(ASCII-09), 但是加载的时候都被去掉了,没有起作用. 然后就想这找个特殊空格试下,类似于CF中的空白用户名. 后来查了下ASCII码表发现还有挺多的, 不过经过测试…

U-boot常用命令(一)

信息查询命令 bdinfo&#xff0c;查看板子信息。 可以得到DRAM的起始地址和大小、启动参数保存起始地址、波特率、sp&#xff08;堆栈指针&#xff09;起始地址信息。 printenv输出环境变量信息&#xff0c;uboot 也支持 TAB 键自动补全功能&#xff0c;输入“print” 然后按…

STL-deque容器

双端数组&#xff0c;可以对头端进行插入删除操作 deque 容器和 vecotr 容器有很多相似之处&#xff0c;比如&#xff1a; deque 容器也擅长在序列尾部添加或删除元素&#xff08;时间复杂度为O(1)&#xff09;&#xff0c;而不擅长在序列中间添加或删除元素。deque 容器也可…

web自动化测试进阶篇01 ——— 策略模式的实践与技巧

&#x1f60f;作者简介&#xff1a;博主是一位测试管理者&#xff0c;同时也是一名对外企业兼职讲师。 &#x1f4e1;主页地址&#xff1a;【Austin_zhai】 &#x1f646;目的与景愿&#xff1a;旨在于能帮助更多的测试行业人员提升软硬技能&#xff0c;分享行业相关最新信息。…

【rust】| 02——语法基础_变量(不可变?)和常量

系列文章目录 【rust】| 00——开发环境搭建 【rust】| 01——编译并运行第一个rust程序 【rust】| 02——语法基础_变量(不可变?)和常量 文章目录 1. 变量1.1 变量的定义1.2 试验变量的不可变特性 2. 常量2.1 常量的定义 3. 覆盖(同名变量)3.1 修改已定义变量的数据类型3.2 1…

第5章 负载均衡

第5章 负载均衡 5.1 proxy_pass详解 在nginx中配置proxy_pass代理转发时&#xff0c;如果在proxy_pass后面的url加/&#xff0c;表示绝对根路径&#xff1b;如果没有/&#xff0c;表示相对路径&#xff0c;把匹配的路径部分也给代理走。 假设下面四种情况分别用 http://192.…

深度学习与文本聚类:一篇全面的介绍与实践指南

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

vivado手写ROM改IP核

一、引言 手写了一个ROM&#xff0c;用于ADC或者DAC的寄存器配置。DAC出来的波形总是有两个对称的小肩膀&#xff0c;找不到原因。时序没有报错&#xff0c;但是有延迟。之前听同事说他们也遇到过这样的问题&#xff0c;是时序问题。所以&#xff0c;我也想试一下&#xff…

杂谈系列:唐高祖~开元通宝

唐高祖李渊&#xff08;566年12月 &#xff0d;635年6月 &#xff09;&#xff0c;字叔德。中国唐朝开国皇帝&#xff08;618年6月18日&#xff0d;626年9月4日在位&#xff09;。 作为唐朝开国的帝王&#xff0c;史学界对李渊的评价不一。有观点认为他优柔失断&#xff0c;赏罚…

源码级别讲解 redis 底层数据结构

redis 底层数据结构 Redis作为Key-Value存储系统&#xff0c;数据结构如下&#xff1a; Redis没有表的概念&#xff0c;Redis实例所对应的db以编号区分&#xff0c;db本身就是key的命名空间。 比如&#xff1a;user:1000作为key值&#xff0c;表示在user这个命名空间下id为10…

二叉树的层级遍历以及[NOIP2015 普及组] 扫雷游戏、有效时间的数目

一、二叉树的层级遍历 二叉树的层级遍历看着比其他遍历简单&#xff0c;但是我感觉实施起来却比其他遍历难&#xff0c;它主要是通过队列实现的 比如在这样的一颗二叉树中 我没先将a入队 队列&#xff1a;a 当a出队的时候就将它的左儿子和右儿子入队 队列&…

一图看懂 toml 模块:用于解析和创建TOML(Tom‘s Obvious, Minimal Language)的Python库, 资料整理+笔记(大全)

本文由 大侠(AhcaoZhu)原创&#xff0c;转载请声明。 链接: https://blog.csdn.net/Ahcao2008 [TOC](一图看懂 toml 模块&#xff1a;用于解析和创建TOML(Tom’s Obvious, Minimal Language)的Python库, 资料整理笔记&#xff08;大全&#xff09;) ☘️摘要 全文介绍系统内置…

leap模型重点关注技术,如:能源结构清洁转型、重点领域如工业、交通节能减排降耗、新能源发电系统及发电成本最优化、区域碳达峰碳中和实现路径设计及政策评估

模型简介&#xff1a; 中文名&#xff1a;LEAP模型 外文名&#xff1a;Long Range Energy Alternatives Planning System/ Low emission analysis platform 采用部门分析法建立的LEAP模型&#xff08;长期能源可替代规划模型&#xff09;是一种自下而上的能源-环境核算工具&a…

Spark大数据处理讲课笔记3.8 Spark RDD典型案例

文章目录 零、本节学习目标一、利用RDD计算总分与平均分&#xff08;一&#xff09;提出任务&#xff08;二&#xff09;准备工作1、启动HDFS服务2、启动Spark服务3、在本地创建成绩文件4、将成绩文件上传到HDFS &#xff08;三&#xff09;实现步骤1、打开RDD项目2、创建计算总…

FS2455高效率的同步降压DC-DC转换器5A输出电流

概述 FS2455是一种高效率的同步降压DC-DC转换器&#xff0c;具有5A输出电流。 FS2455在4.5V到30V的宽输入电压范围内工作&#xff0c; 集 成主开关和同步开关&#xff0c;具有非常低的RDS&#xff08;ON&#xff09;以最小化传导损失。 FS2455具有轻载时的应用和高效率。此外…

[GFCTF 2021]文件查看器(GZ、过滤器、phar) day4

打开界面直接一个登录界面&#xff0c;直接admin/admin登录进去 。 进来之后发现是一个文件查看器的功能 随便输入了点东西发现了报错&#xff0c;然后读取文件的功能&#xff0c;输入Files.classs.php发现读取不成功 换了个index.php <?phpfunction __autoload($classN…

无效的目标发行版: 11

背景&#xff1a;最近在研究es&#xff0c;想着弄一个连接es集群的springboot的工程&#xff0c;然后就在网上找到一个&#xff0c;结果弄到本地运行时&#xff0c;报错了“ 无效的目标发行版: 11 ” 看着报错就知道肯定是你导入的项目和你本地的JDK版本不匹配了&#xff0c;然…

手把手教你如何将安卓手机数据导入iPhone!【详解】

案例&#xff1a;安卓数据导入苹果手机 【大神们&#xff0c;刚换了新的苹果手机&#xff0c;原本的安卓手机数据怎么导入新手机&#xff1f;】 想要换用iPhone&#xff0c;但是又不想丢失安卓手机里的重要数据怎么办&#xff1f;如何将安卓手机数据导入iphone&#xff1f;本文…