rocketmq实现限流

news2025/1/11 9:50:14

目录

问题背景

技术方向

方案确认

消息队列(√)

分布式锁(×)

方案实现

监控方向

业务方向


问题背景

公司邮件服务token有 分钟内超200封的熔断机制,当前token被熔断后,系统发邮件操作会被忽略,所以邮件服务也没有重试操作

人工发现token被熔断后,需要联系邮件群中值班人,将token恢复

分货业务依赖邮件来查看分货通知以及结果,并且分货层层依赖,如果不能及时收到邮件会影响业务的分货时效等,所以通过三个方面去解决这个问题

技术方向

系统内发邮件收口做限流

方案确认

方向:限流发邮件方法1分钟内最大200次

实现:改造系统发邮件底层方法,1分钟内最多发200个

消息队列(√)

面临问题:多出来的怎么处理?消息队列(需要持久化)

实现:新建一个topic,调用发邮件方法的请求全部扔到MQ中,自己消费,通过设置消费者的拉取间隔以及最大拉取数量限制,分钟内消费消息条数不超过200条

面临问题:多分区多消费者?

实现:默认拉取数量为32,目前MQ服务端设置,限制最大拉取数量为32

(可行)设置1个分区,一个消费者组,目前有2个实例(此时其中一个实例不会消费),设置拉取间隔为10s

(不可行,有自动加实例机制)设置2个分区,一个消费者组,目前有2个实例,设置拉取间隔为10s,最大拉取条数为16;系统在流量激增的情况下会增加实例来分摊流量

最终实现方式

topic设置1个分区,一个消费者组,使用默认负载均衡策略:平均分配

//平均分配负载均衡核心逻辑
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);

for(int i = 0; i < range; ++i) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

return result;

解析两个实例负载均衡过程

//第一个实例起来,触发负载均衡
//index = 0
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 0 * 1 = 0
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 0) = 1
int range = Math.min(averageSize, mqAll.size() - startIndex);

for(int i = 0; i < range; ++i) {
    //(0+0)%1 = 0,所以将第一个分区分给当前实例
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

//第二个实例起来,触发负载均衡
//index = 1
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 1 * 1 + 1 = 2
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 2) = -1
int range = Math.min(averageSize, mqAll.size() - startIndex);

//不会进入循环分配分区
for(int i = 0; i < range; ++i) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

所以只会有一个实例去消费当前这个分区,在集群消费模式下,一个分区只会被消费组内的一个消费者消费,rmq默认拉取数量为32,设置拉取间隔为10s,所以每分钟内消费:32*6 = 192

分布式锁(×)

当前场景的点,在于需要将超出1分钟两百条的那些邮件持久化存储,等到下一个一分钟去发送,而分布式锁只能实现控制接口的流量,没法保证超出流量那部分的存储,所以没法解决当前问题

方案实现

最终采用消息队列,RocketMQ解决该问题

实现代码,使用Java SDK,设置拉取间隔为10s即可

public void run(String... args) throws Exception {
    Properties properties = new Properties();
    properties.setProperty(ConfigKey.CONSUMER_GROUP, emailNotifyMqProperties.getConsumerGroup());
    properties.setProperty(ConfigKey.ACCESS_KEY, rocketMqProperties.getAccessKey());
    properties.setProperty(ConfigKey.SECRET_KEY, rocketMqProperties.getSecretKey());
    properties.setProperty(ConfigKey.NAME_SERVER_ADDR, rocketMqProperties.getServer());
    properties.setProperty(ConfigKey.ENABLE_MSG_TRACE, "true");
    //消费限流:解决发邮件分钟内超过200封会被熔断的问题
    properties.setProperty(ConfigKey.PULL_INTERVAL, "10000");

    NormalConsumer consumer = ClientFactory.createNormalConsumer(properties, this::consumeMessage);

    consumer.subscribe(emailNotifyMqProperties.getTopic(), null);
    consumer.start();
}

监控方向

1. 系统日志报警,配置邮件发送失败报警

2. 关注token熔断消息通知

业务方向

梳理当前系统中邮件通知的场景,分析报警内容,从以下方向减少邮件次数发送

1. 用户是否需要关注(用户长时间使用下来,部分通知发现自己并不关注的,比如节点报错可重试成功的

2. 是否可以批量发送(多条通知集合到一条邮件发送:多用户,多单据等)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1906983.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于微信支付-商户平台:查询订单提示“查询失败:操作失败,请稍候重试”的分析

目录 引子 分析 应对 小结 引子 在开发和实施微信 JSAPI 支付的应用后&#xff0c;我们遇到了一些问题&#xff0c;订单的状态更新不正常&#xff0c;当然我们首先需要从自身寻找原因和完善解决问题的办法和方案。在支付的过程中&#xff0c;客户会给我们一些反馈&#xf…

K8S篇之Ingress详解以及用法说明

一、Ingress简介 Ingress 是 Kubernetes 中用于管理和配置从集群外部访问集群内部服务的资源对象。它通过定义路由规则来控制外部流量的访问方式&#xff0c;支持基于 HTTP 和 HTTPS 的高级路由功能和安全性配置。 Ingress是一种HTTP方式的路由转发机制&#xff0c;为K8S服务配…

【力扣高频题】014.最长公共前缀

经常刷算法题的小伙伴对于 “最长”&#xff0c;“公共” 两个词一定不陌生。与此相关的算法题目实在是太多了 &#xff01;&#xff01;&#xff01; 之前的 「动态规划」 专题系列文章中就曾讲解过两道相关的题目&#xff1a;最长公共子序列 和 最长回文子序列 。 关注公众…

SpringCloud 负载均衡

目录 一、负载均衡 1、问题 2、什么是负载均衡 服务端负载均衡 客户端负载均衡 二、Spring Cloud LoadBalance 1、使用 Spring Cloud LoadBalance 2、负载均衡策略 3、LoadBalancer 原理 一、负载均衡 1、问题 我们来看一下前面写的代码&#xff1a; List<Serv…

【易捷海购-注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞…

Drools开源业务规则引擎(五)- jBPM流程图元素介绍

文章目录 Drools开源业务规则引擎&#xff08;五&#xff09;- jBPM流程图元素介绍1.process2.startEvent3.Connections3.1.sequenceFlow3.2.Association 4.Activities4.1.businessRuleTask4.2.scriptTask 5.Gateways5.1.exclusiveGateway 6.endEvent Drools开源业务规则引擎&a…

Windows netstat命令详解,Windows查看网络连接

「作者简介」&#xff1a;冬奥会网络安全中国代表队&#xff0c;CSDN Top100&#xff0c;就职奇安信多年&#xff0c;以实战工作为基础著作 《网络安全自学教程》&#xff0c;适合基础薄弱的同学系统化的学习网络安全&#xff0c;用最短的时间掌握最核心的技术。 netstat 常用来…

支持图片识别语音输入的LobeChat保姆级本地部署流程

文章目录 前言1. LobeChat对我们有哪些帮助?2. 本地安装LobeChat3. 如何使用LobeChat工具4. 安装Cpolar内网穿透5. 实现公网访问LobeChat6. 固定LobeChat公网地址 前言 本文主要介绍如何在Windows系统电脑本地部署LobeChat&#xff0c;一款高颜值的开源AI大模型智能应用&…

【删库跑路】一次删除pip下载的所有第三方库方法

进入命令行&#xff0c;先list看下库存 pip list导出所有的第三方库至一文件列表 pip freeze >requirements.txt按照列表卸载所有库 pip uninstall -r requirements.txt -y再list看下&#xff0c;可见库存已清空

MATLAB基础应用精讲-【数模应用】 岭回归(Ridge)(附MATLAB、python和R语言代码实现)

目录 前言 算法原理 数学模型 Ridge 回归的估计量 Ridge 回归与标准多元线性回归的比较 3. Ridge 参数的选择 算法步骤 SPSSPRO 1、作用 2、输入输出描述 3、案例示例 4、案例数据 5、案例操作 6、输出结果分析 7、注意事项 8、模型理论 SPSSAU 岭回归分析案…

第三届机器学习、云计算与智能挖掘国际会议(MLCCIM 2024)

随着科技的不断进步&#xff0c;机器学习和挖掘技术已成为推动现代社会发展的重要力量。本届机器学习、云计算与智能挖掘国际会议&#xff08;MLCCIM&#xff09;将于2024年8月8日至8月11日在中国湖北省神农架盛大召开&#xff0c;旨在汇聚全球智慧&#xff0c;共同探讨这一领域…

Linux安装elasticsearch单机版

一、检查内核 uname -a uname -m 二、下载版本 下载版本选择自己服务器相同的内核版本 我这边是aaech64 ES下载地址 Kibana 下载地址 二、上传服务器解压 tar -xvf elasticsearch-8.14.1-linux-aarch64.tar.gz 三、安装ES 因为ES不能用root用户启动先创建用户 #新增 es …

[python]Markdown图片引用格式批处理桌面应用程序

需求 使用python编写一个exe&#xff0c;实现批量修改图片引用&#xff0c;将修改后的文件生成为 文件名_blog.md。有一个编辑框&#xff0c;允许接收拖动过来md文件&#xff0c;拖入文件时获取文件路径&#xff0c;有一个编辑框编辑修改后的文件的输出路径&#xff0c;用户拖入…

抖音商城自定义小程序源码系统 前后端分离 带完整的源代码包以及搭建教程

系统概述 在当今数字化时代&#xff0c;电商平台的便捷性和个性化体验成为了吸引用户的关键。随着短视频平台的兴起&#xff0c;抖音作为其中的佼佼者&#xff0c;其商城小程序成为了商家连接消费者的新阵地。为了帮助商家快速构建个性化、高效的小程序店铺&#xff0c;本文将…

Java面试题--JVM大厂篇之深入探讨Serial GC的应用场景

目录 引言: 正文: 一、什么是Serial GC&#xff1f; 二、Serial GC的工作原理 三、适用场景 单处理器环境在单处理器环境下&#xff0c;Serial GC是一个非常好的选择。由于没有多余的处理器资源&#xff0c;单线程的垃圾回收操作不会导致额外的上下文切换开销&#xff0c…

springboot非物质文化遗产管理系统-计算机毕业设计源码16087

目录 摘要 1 绪论 1.1 选题背景与意义 1.2国内外研究现状 1.3论文结构与章节安排 2系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1系统开发流程 2.2.2 用户登录流程 2.2.3 系统操作流程 2.2.4 添加信息流程 2.2.5 修改信息流程 2.2.6 删除信息流程 2.3 系统功能…

欧拉系统(openEuler) aarch64虚拟机安装

文章目录 一、操作背景二、资源准备三、文件路径四、安装QEMU五、创建磁盘文件六、安装虚拟机七、连接虚拟机八、启动虚拟机 一、操作背景 客户公司扫描出欧拉系统aarch64架构服务器存在编号 CVE-2024-1086 的内核漏洞&#xff0c;需要对内核升级&#xff0c;首先在个人电脑虚…

硬盘错误0x80071ac3如何修复?5大免费修复法,轻松找回硬盘数据

今天我们要聊的是一个让大家头疼不已的问题——硬盘错误0x80071ac3。你是否也曾经遇到过这个烦人的错误代码&#xff0c;导致数据无法读取、文件丢失&#xff0c;甚至整个硬盘都无法正常使用&#xff1f;别担心今天小编就为大家详细解析这个错误的原因&#xff0c;并分享5个免费…

手机数据恢复:如何在没有root的情况下恢复Android数据?

您是否不小心从Android设备中删除了重要数据&#xff1f;您是否担心如何取回您的照片、视频和文档&#xff1f;有时&#xff0c;我们不小心删除了重要数据&#xff0c;并使用Android root方法取回文件。许多用户不喜欢root他们的Android设备&#xff0c;因为这是一种复杂的方法…

第十一节 动态面板加密解密显示

在原型中我们经常会遇到文件加密与解密显示问题&#xff0c;下面以一个简单案例来说明实现怎么切换明文与密文不同显示方式案例说明&#xff1b; 1、添加动态面板 2、设置加密与不加密 3、添加动作事项 注意为可见时要设置面板状态向前循环&#xff0c;上一项&#xff0c;否则…