Java Resilience4j-RateLimiter学习

news2024/11/16 21:59:42

一. 介绍

Resilience4j-RateLimiter 是 Resilience4j 中的一个限流模块,我们对 Resilience4j 的 CircuitBreaker、Retry 已经有了一定的了解,现在来学习 RateLimiter 限流器;

引入依赖;

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <!--jdk17对应的版本-->
    <version>2.2.0</version>
</dependency>

二. 配置项

和 Retry 类似,RateLimiter 中也有一些配置项,对应 RateLimiterConfig 类的配置项;

RateLimiter 的配置项相比 CircuitBreaker、Retry 来说非常少,我们看下它的几个配置项;

  • limitRefreshPeriod:刷新限流的时间;
  • limitForPeriod:在刷新周期内的最大允许请求数,也就是最大 permission 数;
  • timeoutDuration:获取 permission 的最大等待时间,超过此时间的话则认为无法获取到 permission,需要进行限流;

三. 简单使用

我们模拟在一个主线程中循序执行逻辑,看是否触发限流,以及触发几次限流;

public class TestRateLimiter01 {

    public static void main(String[] args) {
        // 创建一个限流配置
        RateLimiterConfig config = RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))   // 每秒刷新限流
            .limitForPeriod(10)                          // 每秒允许的最大请求数
            .timeoutDuration(Duration.ofMillis(200))     // 获取 permission 的最大等待时间,200ms
            .build();
        RateLimiterRegistry registry = RateLimiterRegistry.custom()
            .withRateLimiterConfig(config).build();
        RateLimiter rateLimiter = registry.rateLimiter("myRateLimiter");


        for (int i = 0; i < 23; i++) {
            try {
                rateLimiter.executeRunnable(() -> System.out.println("--" + System.currentTimeMillis()));
            } catch (RequestNotPermitted ex) {
                System.out.println("发生了限流" + System.currentTimeMillis());
            }
        }
    }
}

打印如下:

--1723888206695
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
发生了限流1723888206903
发生了限流1723888207104
发生了限流1723888207309
发生了限流1723888207512
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698

可以看到出现了 4 次限流;

分析:

  1. 由于获取 permission 的最大等待时间是 200ms,permission 的刷新周期是 1 s,也就是 1000 ms;且我们只有一个主线程;前 10 次顺利执行后,所剩的 permission 为 0;
  2. 第 11 次请求,到下一个周期大概还有 800 ms,大于我们获取 permission 的最大等待时间 200 ms,此时获取不到 permission,阻塞等待 200 ms,并限流;以此类推;
  3. 第 15 次请求,到下一个周期大概还有 180 ms,小于我们获取 permission 的最大等待时间 200 ms,此时能够获取到 permission,需要阻塞等待 180 ms,等待下一个周期的到来;
  4. 第 16 - 23 次请求,正常调用;

四. 限流算法

我们先看官网的这张图;

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Resilience4j 总共有两种实现:

  • 基于 Java 信号量(Semaphore-Based Rate Limiter)
  • 基于原子计数器(Atomic Rate Limiter)

原子计数器(Atomic Rate Limiter)是默认的实现,我们看 AtomicRateLimiter,有时间的话再了解基于信号量的算法;

上图就是 AtomicRateLimiter 的实现示意图,它通过 AtomicReference 管理其状态。 其中,AtomicRateLimiter.State 是不可变的,并且具有以下字段:

  • activeCycle:上一次调用使用的周期号;
  • activePermissions:上次调用后的可用权限数;如果可以保留某些权限,则可以为负;
  • nanosToWait:等待上一次呼叫的等待许可的纳秒数;

主要逻辑是:

  1. 将时间分成相等的部分,称为循环;在任何时候,我们都可以通过计算 currentTime / cyclePeriod 来确定当前周期;
  2. 如果我们知道限制器最后一次使用的当前周期数和周期,那么我们实际上可以计算出应该在限制器中出现多少个权限;
  3. 经过此计算后,如果可用权限还不够,我们可以通过减少当前权限并计算我们等待它出现的时间来判断执行权限保留;
  4. 经过所有计算后,我们可以产生一个新的限制器状态并将其存储在 AtomicReference 中;

五. 分析

1. executeRunnable()

我们直接从 RateLimiter.executeRunnable() 入手;

// ------------------------------------- RateLimiter ------------------------------------
default void executeRunnable(Runnable runnable) {
    // permits 为 1,即每次请求都获取一个 permit
    executeRunnable(1, runnable);
}


// ------------------------------------- RateLimiter ------------------------------------
default void executeRunnable(int permits, Runnable runnable) {
    decorateRunnable(this, permits, runnable).run();
}


// ------------------------------------- RateLimiter ------------------------------------
static Runnable decorateRunnable(RateLimiter rateLimiter, int permits, Runnable runnable) {
    return decorateCheckedRunnable(rateLimiter, permits, runnable::run)
        .unchecked();
}



// ------------------------------------- RateLimiter ------------------------------------
static CheckedRunnable decorateCheckedRunnable(RateLimiter rateLimiter, int permits,
                                               CheckedRunnable runnable) {
    return () -> {
        // 1. 等待获取 permission
        waitForPermission(rateLimiter, permits);
        try {
            // 2. 执行 runnable
            runnable.run();
            
            // rateLimiter.onSuccess() 和 onError() 是统计用的,可以先不看
            rateLimiter.onSuccess();
        } catch (Exception exception) {
            rateLimiter.onError(exception);
            throw exception;
        }
    };
}

先等待获取 permission,只有获取到 permission 的情况下才能执行 runnable;waitForPermission() 是核心方法;

2. waitForPermission()

// ------------------------------------- RateLimiter ------------------------------------
static void waitForPermission(final RateLimiter rateLimiter, int permits) {
    // 1. 调用 rateLimiter.acquirePermission(permits) 来获取 permits 数量的 permission
    // 默认使用的 RateLimiter 是 AtomicRateLimiter,我们主要分析 AtomicRateLimiter
    boolean permission = rateLimiter.acquirePermission(permits);
    if (Thread.currentThread().isInterrupted()) {
        throw new AcquirePermissionCancelledException();
    }
    
    // 2. 如果获取失败,此时需要限流,抛出 RequestNotPermitted 异常
    if (!permission) {
        throw RequestNotPermitted.createRequestNotPermitted(rateLimiter);
    }
}

3. acquirePermission()

获取 permission 调用的是 RateLimiter.acquirePermission(int permits),我们主要看 AtomicRateLimiter(令牌桶限流);

// ------------------------------------- AtomicRateLimiter ------------------------------------
public boolean acquirePermission(final int permits) {
  	// 1. timeoutInNacnos 为获取 permission 的最大等待时间
    long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();
    
    // 2. 获取下一个状态
    State modifiedState = updateStateWithBackOff(permits, timeoutInNanos);
    
    // 3. 看是否能获取到 permission,获取到返回 true,获取不到返回 false
    boolean result = waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait);
    
    // 4. 发布事件
    publishRateLimiterAcquisitionEvent(result, permits);
    
    // 返回获取结果
    return result;
}

我们主要看第 2 步和第 3 步;

3.1 updateStateWithBackOff()

updateStateWithBackOff() 主要用于更新 State,通过 CAS 的方式更新 State;

// ------------------------------------- AtomicRateLimiter ------------------------------------
private AtomicRateLimiter.State updateStateWithBackOff(long timeoutInNanos) {
    AtomicRateLimiter.State prev;
    AtomicRateLimiter.State next;
    do {
        prev = (AtomicRateLimiter.State)this.state.get();
        // 执行 calculateNextState()
        next = this.calculateNextState(timeoutInNanos, prev);
    } while(!this.compareAndSet(prev, next));

    return next;
}

calculateNextState() 比较复杂,逻辑如下:

// ------------------------------------- AtomicRateLimiter ------------------------------------
private AtomicRateLimiter.State calculateNextState(long timeoutInNanos, 
                                                   AtomicRateLimiter.State activeState) {
    // 每个时间段对应纳秒数,由配置文件中的 limitRefreshPeriodInMillis 计算而来
    long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriodInNanos();
    //每个时间段内可执行次数,对应配置文件中的limitForPeriod
    int permissionsPerCycle = activeState.config.getLimitForPeriod();
    
    // 计算从本类初始化到现在的纳秒数
    long currentNanos = this.currentNanoTime();
    // 计算当前 cycle 数 
    long currentCycle = currentNanos / cyclePeriodInNanos;
    
    long nextCycle = activeState.activeCycle;
    int nextPermissions = activeState.activePermissions;
    
    
    // 1. 如果已经进入后续的 cycle,重置 nextCycle 和 nextPermissions 值
    // nextPermissions 需要通过计算得到
    // 这是因为 activeState.activePermissions 会有赊账的情况,可能会存在负值
    // 所以 nextPermissions = Long.min(nextPermissions + nextState, permissionsPerCycle)
    long nextNanosToWait;
    if(nextCycle != currentCycle) {
        nextNanosToWait = currentCycle - nextCycle;
        long nextState = nextNanosToWait * permissionsPerCycle;
        nextCycle = currentCycle;
        nextPermissions = Long.min(nextPermissions + nextState, permissionsPerCycle);
    }

    // 2. 计算所需等待时间
    nextNanosToWait = this.nanosToWaitForPermission(cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos, currentCycle);
    
    // 3. 需要根据 nextNanosToWait 和 timeoutInNanos 做对比
    // 所需时间和超时时间做对比,判断能否在能及时执行完
    AtomicRateLimiter.State nextState1 = this.reservePermissions(activeState.config, timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait);
    return nextState1;
}
3.1.1 nanosToWaitForPermission()

我们看下 nanosToWaitForPermission() 的实现,逻辑为判断是否还有可用执行次数,如果还有次数则直接返回 0,表示不需要等待时间;

否则计算总共需要等待的时间,如果所需的 permits 过大,可能会导致需要等待很多个 cycle;对于我们正常使用来说,permits 一般都为 1,这里一般最多等待 nanosToNextCycle,即到下一个时间周期的剩余时间;

// ------------------------------------- AtomicRateLimiter ------------------------------------
private long nanosToWaitForPermission(final int permits, final long cyclePeriodInNanos,
                                      final int permissionsPerCycle,
                                      final int availablePermissions, 
                                      final long currentNanos, final long currentCycle) {
    if (availablePermissions >= permits) {
        return 0L;
    }
    long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos;
    long nanosToNextCycle = nextCycleTimeInNanos - currentNanos;
    int permissionsAtTheStartOfNextCycle = availablePermissions + permissionsPerCycle;
    int fullCyclesToWait = divCeil(-(permissionsAtTheStartOfNextCycle - permits),
                                   permissionsPerCycle);
    
    // 一般等待时间都为 nanosToNextCycle
    return (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle;
}

下述为了解内容;

1、如果 permits 过大,示例如下,需要等待一个周期 + nanosToNextCycle;

availablePermissions = 2
permits = 20
permissionsAtTheStartOfNextCycle = 2+10 = 12
fullCyclesToWait = divCeil (-(12-20), 10) = divCeil(8, 10) = 1

2、如果我们设置的 timeoutInNanos 过大,比如为 6 秒,可能会出现赊账严重,示例如下,需要等待两个周期 + nanosToNextCycle;所以我们尽量不要设置 timeoutInNanos 过大;

availablePermissions = -22
permits = 1
permissionsAtTheStartOfNextCycle = -22+10 = -12
fullCyclesToWait = divCeil (-(-12-1), 10) = divCeil(13, 10) = 2
3.1.2 reservePermissions()

我们再来看下 reservePermissions() 的实现;

根据 nextNanosToWait 和 timeoutInNanos 做对比,将所需时间和超时时间做对比,判断能否在能及时执行完;

  • timeoutInNanos >= nanosToWait:能及时执行完,可用次数 permission-1,同时更新 cycle、nanosToWait;返回新的 State 对象;
  • timeoutInNanos < nanosToWait:不能及时执行完,permission 不变,同时更新 cycle、nanosToWait;返回新的 State 对象;
// ------------------------------------- AtomicRateLimiter ------------------------------------
private State reservePermissions(final RateLimiterConfig config, final int permits,
                                 final long timeoutInNanos,
                                 final long cycle, final int permissions, final long nanosToWait) {
    boolean canAcquireInTime = timeoutInNanos >= nanosToWait;
    int permissionsWithReservation = permissions;
    if (canAcquireInTime) {
        permissionsWithReservation -= permits;
    }
    return new State(config, cycle, permissionsWithReservation, nanosToWait);
}

3.2 waitForPermissionIfNecessary()

// ------------------------------------- AtomicRateLimiter ------------------------------------
private boolean waitForPermissionIfNecessary(final long timeoutInNanos,
                                             final long nanosToWait) {
    boolean canAcquireImmediately = nanosToWait <= 0;
    boolean canAcquireInTime = timeoutInNanos >= nanosToWait;

    // 1. nanosToWait == 0 的情况,表示立即获取到了 permission,返回 true
    if (canAcquireImmediately) {
        return true;
    }
    
    // 2. timeoutInNanos >= nanosToWait,表示需要等待 nacosToWait 到下一个时间周期
    // 调用线程会在此处阻塞等待 nanosToWait 时间,等待完成后返回 true
    if (canAcquireInTime) {
        return waitForPermission(nanosToWait);
    }
    
    // 3. timeoutInNanos < nanosToWait,超过我们指定的获取 permission 的最大等待时间
    // 调用线程会在此处阻塞等待 timeoutInNanos 时间,等待完成后返回 false,表示获取失败,需要限流
    waitForPermission(timeoutInNanos);
    return false;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2049734.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

序列建模之循环和递归网络 - 递归神经网络篇

序言 在序列建模的广阔领域中&#xff0c;递归神经网络&#xff08; Recursive Neural Network, RNN \text{Recursive Neural Network, RNN} Recursive Neural Network, RNN&#xff09;&#xff0c;注意此处的 RNN \text{RNN} RNN与常用于序列处理的循环神经网络在命名上有所…

【生成式人工智能-十五-经典的影像生成方法-GAN】

经典的影像生成方法-GAN GANDiscriminatorGenerator还需要加入额外信息么 GAN可以加在其他模型上面我们可以用影像生成模型做什么&#xff1f; 前面讲过VAE和Flow-based以及diffusion Model &#xff0c;今天讲最后一种经典的生成方法GAN。 GAN 前面讲的几种模型都是用加入额外…

红黑树剖析(插入部分)

文章目录 红黑树插入节点情景分析情景1&#xff1a;红黑树为空树情景2&#xff1a;插入节点的Key已存在情景3&#xff1a;插入节点的父节点为黑色节点情景4&#xff1a;插入节点的父节点为红色情景4.1 叔叔节点存在并且为红色节点情景4.2 叔叔节点存在而且是黑色节点情景4.3 叔…

xss 一些例子

目录 XSS 1.Ma Spaghet!​编辑 2.Jefff​编辑 3.Ugandan Knuckles​编辑 4.Ricardo Milos​编辑 5.Ah Thats Hawt​编辑 6.Ligma​编辑 7.Mafia​编辑 简单解法就是换一个函数 作者得原意解法 8.Ok, Boomer​编辑 XSS 1.Ma Spaghet! 这里接收了一个somebody参数&…

Chain of Thought (CoT) 系列论文:大模型思维链,提升 LLM 的推理能力

文章目录 1. COT&#xff1a;Chain of Thought1. 研究背景2. CoT的原理3. CoT Prompt 1. COT&#xff1a;Chain of Thought COT 是 2022.01 由 google 提出的针对提升 LLM 的推理能力的 Prompt Engineering 方法。 paper&#xff1a; Chain-of-Thought Prompting Elicits Re…

一器多能,数据文件处理的瑞士军刀 — dasel

Dasel&#xff1a;简化数据操作&#xff0c;提升开发效率。- 精选真开源&#xff0c;释放新价值。 概览 dasel是一款专为开发者设计的高效数据文件操作工具&#xff0c;它允许用户通过统一的接口对JSON、TOML、YAML、XML和CSV等格式的文件进行数据选择、插入和删除操作。这款工…

Kafka基本概念及消费流程

Kafka是消息中间件的一种&#xff0c;相较于其他消息中间件&#xff0c;其以极高的吞吐量闻名&#xff0c;常用于构建实时数据管道和流应用&#xff0c;能够处理高吞吐量的数据流。以下是Kafka中的重要概念&#xff1a; 1. 生产者 生产者是向Kafka主题发送消息的客户端。生产…

登录 k8s-Dashboard 显示 Your connection is not private

文章目录 一、背景二、解决方案 一、背景 部署好 kubernetes-Dashboard 后使用 master节点的 ipport 登录 Dashboard 显示 Your connection is not private 无论是 Edge 还是 Google Chrome 都是这样的情况 二、解决方案 点击网页空白处&#xff0c;英文输入法输入&#xf…

论文解读:LONGWRITER: UNLEASHING 10,000+ WORD GENERATION FROM LONG CONTEXT LLMS

摘要 现象&#xff1a;当前的大预言模型可以接受超过100,000个tokens的输入&#xff0c;但是却难以生成超过2000个token的输出。 原因&#xff1a;监督微调过程(SFT)中看到的样本没有足够长的样本。 解决方法&#xff1a; Agent Write&#xff0c;可以将长任务分解为子任务&a…

为什么MCU I2C波形中会出现的脉冲毛刺?

在I2C的波形中&#xff0c;经常会发现有这样的脉冲毛刺&#xff0c;会被认为是干扰或者器件不正常。 看到这个波形时&#xff0c;可以先数一下出现在第几个clock的位置&#xff0c;如果出现在第9个clock的低电平期间&#xff0c;就不是干扰或者器件异常导致。 在I2C的协议中&a…

Java并发类的主要API方法-CountDownLatch和CyclicBarrier

1.概念介绍 CountDownLatch 是一个计数器&#xff0c;计数器的初始值由创建它时指定。每次调用 countDown() 方法时&#xff0c;计数器会减1&#xff0c;直到计数器值变为0时&#xff0c;所有调用 await() 的线程都会被唤醒继续执行。 CyclicBarrier 是 Java 中另一个常用的同…

基于CDIO概念的人工智能物联网系统开发与实施的人才培养研究

目录 1. 引言&#xff08;Introduction&#xff09; 2. AIoT技术及其培训特点&#xff08;The Characteristics of AIOT and Its Training&#xff09; 3. 基于CDIO概念的AIoT课程改革&#xff08;CDIO Concept-based Reform of AIOT Course&#xff09; 4. AIoT课程内容安…

SweetAlert2

1. SweetAlert2 SweetAlert2是一个基于JavaScript的库, 用于在网页上替换标准的警告框(alert), 确认框(confirm)和提示框(prompt), 并提供更加美观和用户友好的界面.需要在项目中引入SweetAlert2, 可以通过CDN链接或者将库文件下载到你的项目中来实现这一点. 通过CDN引入:<…

C++:stack类(vector和list优缺点、deque)

目录 前言 数据结构 deque vector和list的优缺点 push pop top size empty 完整代码 前言 stack类就是数据结构中的栈 C数据结构&#xff1a;栈-CSDN博客 stack类所拥有的函数相比与string、vector和list类都少很多&#xff0c;这是因为栈这个数据结构是后进先出的…

SPRING09_ Bean后置处理器创建过程、SmartInstantiationAwareBeanPostProcessor预测方法调用

文章目录 ①. Bean后置处理器创建过程②. SmartInstantiationAwareBeanPostProcessor预测方法调用 ①. Bean后置处理器创建过程 ①. 坏境准备,在BeanPostProcessor的无参构造器、postProcessBeforeInitialization以及postProcessAfterInitialization打上断点.以xml的方式启动容…

秋招突击——8/15——新作{最大子数组和、合并区间、转轮数组、除自身以外的数组的乘积}

文章目录 引言新作最大子数组和个人实现参考实现 合并区间个人实现短板补充——自定义排序标准 参考实现 转轮数组最终实现 除自身以外数组的乘积个人实现 总结 引言 以前刷题的方式方法有问题&#xff0c;花太多时间了&#xff0c;应该先过一遍&#xff0c;然后再针对特定的题…

第一百九十四节 Java集合教程 - Java优先级队列

Java集合教程 - Java优先级队列 优先级队列是其中每个元素具有相关联的优先级的队列。具有最高优先级的元素将从队列中删除。 PriorityQueue 是一个实现类对于Java Collection Framework中的无界优先级队列。 我们可以使用在每个元素中实现的 Comparable 接口作为其优先事项。…

C# OnnxRuntime YoloV5 Demo

目录 效果 模型信息 项目 代码 Form1.cs YoloV5.cs 下载 效果 模型信息 Model Properties ------------------------- --------------------------------------------------------------- Inputs ------------------------- name&#xff1a;images tensor&#xff1a…

机器学习/人工智能中的学习证明

一、说明 在进行任何数学发展之前&#xff0c;我们必须首先了解学习的基础以及它如何与错误的概念密切相关。关于代价函数&#xff0c;它的工作原理是梯度下降原理。本文将回顾梯度下降原理。 二、假想的厨师 想象一下&#xff0c;在任何一天&#xff0c;你决定复制你在一家著名…

8.13 Day19 Windows服务器(Windows service 2008 R2)上域的搭建 (1)

域服务器&#xff08;DC&#xff09;&#xff1a;安装了活动目录服务的服务器就称为DC。 将三台设备配置在同一网络中&#xff0c;此处将外部网络隔离开&#xff0c;只将他们放在局域网中 服务端网络配置&#xff0c;此时与外部网络彻底隔绝开&#xff0c;且已无法和主机通信&…