限流,流量整形算法

news2024/9/19 23:58:00

写在前面

本文看下流量整形相关算法。

目前流量整形算法主要有三种,计数器,漏桶,令牌桶。分别看下咯!

1:计数器

1.1:描述

单位时间内只允许指定数量的请求,如果是时间区间内超过指定数量,则直接拒绝,如果时间区间结束,则重置计数器,开始下一个时间区间。
在这里插入图片描述

1.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;

import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

// 计速器 限速
public class CounterLimiter {
    // 起始时间
    private static long startTime = System.currentTimeMillis();
    // 时间区间的时间间隔 ms
    private static long interval = 1000;
    // 每interval时间内限制数量
    private static long maxCount = 2;
    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    // 计数判断, 是否超出限制
    private static long tryAcquire(long taskId, int turn) {
        long nowTime = System.currentTimeMillis();
        //在时间区间之内
        if (nowTime < startTime + interval) {
            long count = accumulator.incrementAndGet();

            if (count <= maxCount) {
                System.out.println("taskId: " + taskId + " 正常执行!");
                return count;
            } else {
                // 返回-1说明时间区间内被限制了
//                return -count;
                System.out.println("时区内达到次数咯!");
                return -1;
            }
        } else {
            //在时间区间之外
            synchronized (CounterLimiter.class) {
                System.out.println("新时间区到了,taskId:" + taskId + ", turn {}.." + turn);
                // 再一次判断,防止重复初始化
                if (nowTime > startTime + interval) {
                    accumulator.set(0);
                    startTime = nowTime;
                }
            }
            return 0;
        }
    }
    final int threads = 1;

    //线程池,用于多线程模拟测试
//    private ExecutorService pool = Executors.newFixedThreadPool(10);
    private ExecutorService pool = Executors.newFixedThreadPool(threads);

    @Test
    public void testLimit() {
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
//        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        long taskId = Thread.currentThread().getId();
                        long index = tryAcquire(taskId, j);
//                        if (index <= 0) {
                        if (index == -1) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();

            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果

        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time);
    }

}

输出:
在这里插入图片描述

1.3:优缺点

  • 优点
简单
  • 缺点
无法处理流量分配不均匀的情况,可能导致大量的请求被拒绝

1.4:适用场景

流量比较平稳业务场景。比如我司的机器人外呼业务,因为是程序在跑,所以流量很稳定,一旦业务配置导致流量增高,则可以使用该算法进行限流。

但对于突发流量场景,可能会因为很短时间内的突发流量就导致计数器达到最大值,从而时间区间内的剩余时间所有请求全部丢弃,这也存在着被攻击的风险。
在这里插入图片描述

2:漏桶

2.1:描述

水(对应请求)从进水口进入到漏桶里,漏桶以一定的速度出水(请求放行),当水流入速度过大,桶内的总水量大于桶容量会直接溢出,请求被拒绝,如图所示:
在这里插入图片描述

2.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;

import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

// 漏桶 限流
public class LeakBucketLimiter {

    // 计算的起始时间
    private static long lastOutTime = System.currentTimeMillis();
    // 流出速率 每100毫秒漏2次
    private static int leakRate = 1;
//    private static int leakRate = 2000;

    // 桶的容量
    private static int capacity = 5;

    //剩余的水量
    private static AtomicInteger water = new AtomicInteger(0);

    //返回值说明:
    // false 没有被限制到
    // true 被限流
    public static synchronized boolean isLimit(long taskId, int turn) {
        // 如果是空桶,就当前时间作为漏出的时间
        if (water.get() == 0) {
            lastOutTime = System.currentTimeMillis();
            water.addAndGet(1);
            return false;
        }
        // 执行漏水
//        int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
        int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 100)) * leakRate;
        // 计算剩余水量,当前的量减去漏出去的量就是剩余的量
        int waterLeft = water.get() - waterLeaked;
        // 要注意:剩余的量最小是0
        water.set(Math.max(0, waterLeft));
        // 重新更新leakTimeStamp
        lastOutTime = System.currentTimeMillis();
        // 尝试加水,并且水还未满 ,放行
        if ((water.get()) < capacity) {
            System.out.println("水未满,成功加水");
            water.addAndGet(1);
            return false;
        } else {
            System.out.println("水已满,水溢出");
            // 水满,拒绝加水, 限流
            return true;
        }

    }

    final int threads = 1;

    //线程池,用于多线程模拟测试(负责加水)
    private ExecutorService pool = Executors.newFixedThreadPool(threads);
    private ExecutorService outWaterPool = Executors.newFixedThreadPool(threads);

    @Test
    public void testLimit() {

//        new Thread(() -> {
//            for (int i = 0; i < 1000; i++) {
//                if (water.get() > 0) {
//                    System.out.println("出水了");
//                    water.decrementAndGet();
//                } else {
//                    System.out.println("无水可出了");
//                }
//                try {
//                    TimeUnit.MILLISECONDS.sleep(100);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//            }
//        }).start();
        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
//        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;
        // 线程同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        long taskId = Thread.currentThread().getId();
                        boolean intercepted = isLimit(taskId, j);
                        if (intercepted) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();

            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果

        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time);
    }
}

运行:

水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
限制的次数为:0,通过的次数为:20
限制的比例为:0.0
运行的时长为:4.136

Process finished with exit code 0

此时因为水流出的速度快于流入的速度,所以,一直可以成功加水,可以修改leakRate=0,再运行:

水未满,成功加水
水未满,成功加水
水未满,成功加水
水未满,成功加水
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
水已满,水溢出
限制的次数为:15,通过的次数为:5
限制的比例为:0.75
运行的时长为:4.176

Process finished with exit code 0

就可以看到水满溢出的情况了。

2.3:优缺点

  • 优点
可应对突发流量,避免服务被冲垮,从而起到保护服务的作用
  • 缺点
因为出口速率固定,所以当服务能力提升时,无法自动匹配后端服务的能力提升

2.4:适用场景

3:令牌桶

3.1:描述

有一个固定容量的令牌桶,按照一定的速率(可以调节)向令牌桶中放入令牌,请求想要被执行,必须能够从令牌桶中获取到令牌,否则将会被抛弃,参考下图:
在这里插入图片描述

3.2:程序

package com.dahuyou.algrithm.triffic.shaper.counter;

//import lombok.extern.slf4j.Slf4j;

import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

// 令牌桶 限速
//@Slf4j
public class TokenBucketLimiter {
    // 上一次令牌发放时间
    public long lastTime = System.currentTimeMillis();
    // 桶的容量
    public int capacity = 2;
    // 令牌生成速度 /s,如果是调大令牌的生成速度,则服务能力也会得到提高(在服务扛得住的前提下)
    public int rate = 2;
    // 当前令牌数量
    public AtomicInteger tokens = new AtomicInteger(0);

    //返回值说明:
    // false 没有被限制到
    // true 被限流
    public synchronized boolean isLimited(long taskId, int applyCount) {
        long now = System.currentTimeMillis();
        //时间间隔,单位为 ms
        long gap = now - lastTime;

        //计算时间段内的令牌数
        int reverse_permits = (int) (gap * rate / 1000);
        int all_permits = tokens.get() + reverse_permits;
        // 当前令牌数(固有的令牌加上时间段内新产生的令牌就是当前真实的令牌数啦),
        // 因为令牌桶也有固定的数量所以要取下最小值
        tokens.set(Math.min(capacity, all_permits));
//        log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);
//        System.out.println("tokens " + tokens + " capacity " + capacity + " gap  " + gap);

        /**
         * 如果申请的数量大于可用令牌数,则拒绝,否则发放令牌,执行请求
         */
        if (tokens.get() < applyCount) {
            System.out.println("没有辣么多令牌啦!!!");
            // 若拿不到令牌,则拒绝
            // log.info("被限流了.." + taskId + ", applyCount: " + applyCount);
            return true;
        } else {
            System.out.println("令牌拿去撒!!!");
            // 还有令牌,领取令牌
            tokens.getAndAdd( - applyCount);
            lastTime = now;

            // log.info("剩余令牌.." + tokens);
            return false;
        }
    }

    //线程池,用于多线程模拟测试
    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLimit() {

        // 被限制的次数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程的执行轮数
        final int turns = 20;


        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        long taskId = Thread.currentThread().getId();
                        boolean intercepted = isLimited(taskId, 1);
                        if (intercepted) {
                            // 被限制的次数累积
                            limited.getAndIncrement();
                        }

                        Thread.sleep(200);
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有线程结束
                countDownLatch.countDown();

            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //输出统计结果

        System.out.println("限制的次数为:" + limited.get() +
                ",通过的次数为:" + (threads * turns - limited.get()));
        System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("运行的时长为:" + time);
    }
}

输出:
在这里插入图片描述
展示的是既有申请到令牌也有没有申请到令牌的场景,修改代码public int rate = 2000;给令牌发放一个非常大的速度,此时就会一直可以拿得到令牌:
在这里插入图片描述
修改程序public int rate = 0;直接不发放令牌,就可以看到令牌全部申请失败的场景:
在这里插入图片描述

3.3:优缺点

  • 优点
1:因为令牌桶容量有限制,所以可以应对突发流量
2:服务QPS增加或者降低时只需要对应调整令牌的发放速度即可适配
  • 缺点

3.4:适用场景

写在后面

参考文章列表

限流:计数器、漏桶、令牌桶 三大算法的原理与实战(史上最全) 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2124027.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用Python实现时间序列模型实战——Day 17: 时间序列模型的评估方法

一、学习内容 1. 预测误差的评估指标 在时间序列预测中&#xff0c;常用的评估指标包括 MAE (Mean Absolute Error), MSE (Mean Squared Error) 和 RMSE (Root Mean Squared Error)。这些指标用于衡量模型的预测误差。 MAE (Mean Absolute Error)&#xff1a; MAE 衡量预测值…

mysql学习教程,从入门到精通,MySQL WHERE 子句(10)

1、SQL WHERE 子句 在本教程中&#xff0c;您将学习如何使用SQL从表中选择特定记录。 根据条件选择记录 在上一章中&#xff0c;我们学习了如何从表或表列中获取所有记录。但是&#xff0c;在现实世界中&#xff0c;我们通常只需要选择&#xff0c;更新或删除满足某些条件的那…

HarmonyOS---应用测试概述

一、应用质量要求 应用质量要求分为应用体验质量建议和应用内容合规要求两大部分。 1、应用体验质量建议 功能数据完备、基础体验要求、HarmonyOS特征增强体验要求。 &#xff08;1&#xff09;功能数据完备 &#xff08;2&#xff09;基础体验要求 &#xff08;3&#xff09;增…

盘古信息IMS 驱动智能工厂建设,助力制造企业降本增效

随着全球化的加剧和市场竞争的日益激烈&#xff0c;制造业面临着提高生产效率、降低成本、提升产品质量以及实现绿色可持续发展等多重压力。智能工厂是利用人工智能、大数据、物联网、云计算等新技术&#xff0c;实现工厂的智能化、自动化、可持续化发展的新型制造业模式&#…

《深度学习》OpenCV 高阶 图像直方图、掩码图像 参数解析及案例实现

目录 一、图像直方图 1、什么是图像直方图 2、作用 1&#xff09;分析图像的亮度分布 2&#xff09;判断图像的对比度 3&#xff09;检测图像的亮度和色彩偏移 4&#xff09;图像增强和调整 5&#xff09;阈值分割 3、举例 二、直方图用法 1、函数用法 2、参数解析…

芋道快速开发平台的使用心得

1.前端版本 项目的管理后台有 4 个版本&#xff1a; yudao-ui-admin-vue3 (opens new window)&#xff1a;基于 Vue3 element-plus yudao-ui-admin-vben (opens new window)&#xff1a;基于 Vue3 vben(ant-design-vue) yudao-ui-admin-vue2 (opens new window)&#xff…

LaTeX中制作表格【表格数据自动换行】(附latex源码)

一、latex软件推荐 在使用LaTeX进行文档编写时&#xff0c;有几款非常受欢迎且功能强大的软件和在线编辑器可供选择。以下是一些推荐的LaTeX软件&#xff1a; 1. Overleaf(强烈推荐) Your Projects - Overleaf, Online LaTeX Editorhttps://www.overleaf.com/project 2.Tex…

图新说【消防】(一步步的做好态势标绘)

0.序 越来越多的消防战士使用图新说来做消防预案&#xff0c;态势标绘比武等。 图新说即可应用于具体的消防救援任务&#xff0c;制作具体的作战方案&#xff0c;让战士的配合更加紧密。 也可以做大型的消防预案&#xff0c;针对辖区内的重点建筑、危化存储区提前做应急救援方…

最新版MYMPay码支付开源版系统源码_个人免签支付_聚合支付系统

最新版MYMPay码支付开源版系统源码_个人免签支付_聚合支付系统 安装环境&#xff1a; PHP&#xff1a;7.0-8.2 (推荐使用7.4) 需要安装Xload 扩展 MySQL&#xff1a;5.6版本 访问 http://你的域名/install 进行安装 后台地址&#xff1a;http://你的域名/Admin/ 账号&am…

C++笔记之map的实用操作

C++笔记之map的实用操作 文章目录 C++笔记之map的实用操作1.初始化1.1.使用列表初始化1.2.使用 `insert` 方法1.3.使用 `emplace` 方法1.4.复制构造1.5.移动构造2.赋值2.1.列表赋值2.2.插入元素2.3.批量插入3.取值3.1.使用 `[]` 操作符3.2.使用 `at()` 方法3.3.检查键是否存在3…

C++设计模式——State状态模式

一&#xff0c;状态模式的定义 状态模式是一种行为型设计模式&#xff0c;状态模式允许对象在内部状态发生切换时改变它自身的行为。 状态模式的主要目的是将复杂的状态切换逻辑抽象化为一组离散的状态类&#xff0c;使代码结构更加清晰和易于维护。 状态模式将对象的行为封…

【spring】IDEA 新建一个spring boot 项目

参考新建项目-sprintboot 选择版本、依赖,我选了一堆 maven会重新下载一次么?

Vue(8)——v-model原理

v-model 原理:v-model本质上是一个语法糖。例如应用在输入框上&#xff0c;就是value和input事件的合写。 作用&#xff1a;提供数据的双向绑定。 数据变&#xff0c;视图跟着变 :value视图变&#xff0c;数据跟着变 input <template><div><input v-model&quo…

【智能体】浅谈大模型之AI Agent

随着ChatGPT推出插件和函数调用功能&#xff0c;构建以LLM&#xff08;大语言模型&#xff09;为核心控制器的AI Agent愈发成为一个拥有无限可能的概念。 AI Agent是一种超越简单文本生成的人工智能系统。它使用大型语言模型&#xff08;LLM&#xff09;作为其核心计算引擎&am…

如何将JSON字符串里面的某个字段的json字符串格式转成json对象?

目录标题 背景临时方案最好的方案 背景 下游传过来的数据是一个json字符串&#xff0c;这个json字符串里面有的字段又套着json字符串&#xff01;还有一些字段直接是null传过来的&#xff01;现在要去掉null&#xff0c;且将一些json字符串&#xff01;尽可能的换成json对象&a…

UE4_后期处理_后期处理材质及后期处理体积一

后期处理效果 在渲染之前应用于整个渲染场景的效果。 后期处理效果&#xff08;Post-processing effect&#xff09;使美术师和设计师能够对影响颜色、色调映射、光照的属性和功能进行组合选择&#xff0c;从而定义场景的整体外观。要访问这些功能&#xff0c;可以将一种称为…

[转]什么是工作流,flowable 与 Activiti对比

工作流 什么是工作流工作流是复杂版本的状态机Java工作流开源框架工作流对比 Activiti 设计器Flowable 兼容性Camunda 设计器兼容性&#xff1a;小结&#xff1a;社区活跃度 FlowableActivitiCamunda总结 什么是工作流 工作流&#xff0c;是指“业务​过程的部分或整体在​计算…

面试题:try和finally中都出现了return语句会发生什么情况

答&#xff1a;会导致提前返回&#xff0c;即执行finally中的return语句&#xff0c;try中的return语句不会被执行。 原因&#xff1a;当try中有return语句时&#xff0c;会延迟返回&#xff0c;即先执行完finally的代码&#xff0c;再执行try的return语句&#xff0c;这样做的…

在windows下抓空包(monitor网卡+wareshark+MNM)

当我们的电脑是通过网线联网时&#xff0c;我们可以通过wareshark来抓取通过网口发送和接收到的包&#xff0c;其中包括单播包、多播包以及广播包等等&#xff0c;只要这个包是通过目标网口的。 但是&#xff0c;如果是无线包呢&#xff1f;我们的无线包其实也是通过无线网卡来…

多目标优化及其MATLAB实现

目录 引言 多目标优化的数学模型 非劣解与Pareto前沿 多目标优化求解方法 MATLAB多目标优化工具 多目标规划中的重要概念 表格总结&#xff1a;常见多目标优化方法及其特点 MATLAB在多目标优化中的应用 结论 引言 多目标优化问题在实际应用中非常常见&#xff0c;因为…