【消息中间件】RocketMQ如何实现Producer的负载均衡

news2025/1/8 4:08:20

目录

一、前言

二、实现Producer的负载均衡

1、负载均衡选取一条消息队列并且高可用

1.1、模拟随机递增取模消息队列数为5

1.2、模拟随机递增取模消息队列数为6

1.3、判断Broker代理是否可用

2、更新故障项维护startTimestamp字段

2.1、退避运算

2.2、更新故障项维护startTimestamp字段

3、总结


一、前言

    Producer端在发送消息的时候,会先根据主题Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550L ms,就退避30000L ms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

二、实现Producer的负载均衡

 上一节我们讲解到了生产者消息发送与重试的那块逻辑,也提到了容错策略,本篇文章我们先回顾一下以便接下来的内容分析。更新故障项updateFaultItem()方法中接收的数据为Broker代理的服务名、第二个入参数据就是发送消息远程调用到响应成功或失败所花的时间、第3个是boolean类型(成功或者中断异常情况下为false、其它情况下为true)

1、负载均衡选取一条消息队列并且高可用

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                // 随机递增计算索引
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                // 轮询,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 取模
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }

                // 由于更新故障项,可能导致上面判断broker代理都不可用的情况,但不是绝对的,这里选取一个broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    // 同样是采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

该方法的作用就是负载均衡选取一条高可用的消息队列。

  1. sendLatencyFaultEnable开关打开,采用轮询的方式;采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样,即负载均衡;判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在
  2. 由于更新故障项,可能导致上面判断broker代理都不可用的情况,但不是绝对的,这里选取一个broker;同样是采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息

1.1、模拟随机递增取模消息队列数为5

    public static void main(String[] args) {
        ThreadLocalIndex threadLocalIndex = new ThreadLocalIndex();
        for (int i = 0; i < 10; i++) {
            int incrementAndGet = threadLocalIndex.incrementAndGet();
            // 模拟消息队列数为5
            int pos1 = Math.abs(incrementAndGet++)%5;
            System.out.println("第 " + i + " 为:" + incrementAndGet + "  5选取消息队列为:" + pos1);
        }
    }

实验结果:

实验结论:

由实验结果可以看出随机递增取模的方式选取队列,每次选取都是不同的消息队列,这就是负载均衡,也是轮询。

1.2、模拟随机递增取模消息队列数为6

    public static void main(String[] args) {
        ThreadLocalIndex threadLocalIndex = new ThreadLocalIndex();
        for (int i = 0; i < 10; i++) {
            int incrementAndGet = threadLocalIndex.incrementAndGet();
            // 模拟消息队列数为6
            int pos2 = Math.abs(incrementAndGet++)%6;
            System.out.println("第 " + i + " 为:" + incrementAndGet + "  6选取消息队列为:" + pos2);
        }
    }

实验结果:

实验结论:

同样由实验结果可以看出随机递增取模的方式选取队列,每次选取都是不同的消息队列,这就是负载均衡,也采用了轮询。虽然说这样可以保证在上次发送成功或失败的情况下不再选取同一条消息队列,即多负载了或认为其不可用,但是你仅通过轮询以及随机递增取模是不可以保证下一条消息队列就是可用的。故判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

1.3、判断Broker代理是否可用

    @Override
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            return faultItem.isAvailable();
        }
        return true;
    }

    /**
     * 故障项
     */
    class FaultItem implements Comparable<FaultItem> {
        /**
         * brokerName
         */
        private final String name;
        /**
         * 这次发送消息到出现异常的时间
         */
        private volatile long currentLatency;
        /**
         * 在这个时间点以前,这个brokerName都会标记为可能存在故障
         */
        private volatile long startTimestamp;

        public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
    }
  • 判断brokerName对应下Broker代理是否可用,不是绝对的。
  • startTimestampLatencyFaultToleranceImpl内部类故障项FaultItem中维护的字段,作用是在这个时间点以前,这个brokerName对应下Broker代理都会标记为可能存在故障,即不可用。
  • 为了避免选择故障项,startTimestamp需要通过setter更新维护,而故障项如果恢复的话,你也要更新维护。那么维护它的时机是在哪里呢?前面有提及了更新故障项,那么是否也在该时机执行呢?

2、更新故障项维护startTimestamp字段

public class MQFaultStrategy {
    private final static InternalLogger log = ClientLogger.getLog();
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    /**
     * 潜伏期最大值
     */
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    /**
     * 不可用持续时间
     */
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};


    /**
     * 该方法在发送消息成功或失败后执行
     *
     * @param brokerName 代理服务器服务名
     * @param currentLatency 当前等待时间 = 执行远程调用完毕时间 - 开始时间
     * @param isolation isolation
     */
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            // 计算不可用时间
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            // 延迟错误误差
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

latencyMaxnotAvailableDuration是一一对应关系:latencyMax中数值越小即整个请求处理成功或失败所花的时间越少,越少那么往往都是成功的,故对应notAvailableDuration的值越小甚至0,以便isAvailable()判断Broker代理是否可用不过滤;相反,latencyMax中数值越大,那么故障的可能性越大,这时就尽量不再选择它,退避处理。

2.1、退避运算

    private long computeNotAvailableDuration(final long currentLatency) {
        // 倒序,一旦发送失败,那么尽最大可能避免再次选择发送失败的服务器
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }

这里逻辑就是退避运算:这里数组索引是倒序的,一旦发送失败,那么尽最大可能避免再次选择发送失败的服务器。感兴趣的读者可以自己去模拟下实验,拿几组数据自己去探究探究。

2.2、更新故障项维护startTimestamp字段

    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        // 根据代理服务器服务名brokerName,从本地缓存获取故障项
        FaultItem old = this.faultItemTable.get(name);
        // 没有新建再缓存
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            //
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            // 存在则直接更新
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

根据代理服务器服务名brokerName,从本地缓存获取故障项,没有新建再缓存;有则直接setter关系维护startTimestamp字段

3、总结

本篇文章涉及到的重点就是负载均衡的算法实现以及退避运算中涉及的设计思想等:采用轮询的方式;采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样,即负载均衡;判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在,内部两个二维数组字段参与到退避运算中,计算出的结果维护到startTimestamp字段,以便判断broker代理是否可用、达到高可用目的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/125826.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

干货 | 数字经济创新创业——数字经济下的商业模式与解决方案

下文整理自清华大学大数据能力提升项目能力提升模块课程“Innovation & Entrepreneurship for Digital Economy”&#xff08;数字经济创新创业课程)的精彩内容。主讲嘉宾&#xff1a;Kris Singh: CEO at SRII, Palo Alto, CaliforniaVisiting Professor of Tsinghua Unive…

卷积、转置卷积、膨胀卷积学习记录

Conv计算&#xff1a; 计算公示 1、pytorch中默认参数&#xff0c;以conv1d为例 torch.nn.Conv1d(in_channels, out_channels, kernel_size, stride1, padding0, dilation1, groups1, biasTrue, padding_mode‘zeros’, deviceNone, dtypeNone&#xff09; 2、输出卷积尺寸&am…

MySQL常见内置函数及其使用

目录 1、聚合函数 2、日期函数 3、字符串函数 4、数学函数 5、其它函数 1、聚合函数 函数说明 COUNT([DISTINCT] expr) 返回查询到的数据的 数量SUM([DISTINCT] expr)返回查询到的数据的 总和&#xff0c;不是数字没有意义AVG([DISTINCT] expr)返回查询到的数据的 平均值…

数据蛙恢复软件替代产品有哪些?15款顶尖数据恢复软件清单

数据蛙恢复软件是一款国内数据恢复软件&#xff0c;可以在很多品牌的电脑上使用。但是你可能会遇到数据蛙恢复软件扫描不到需要恢复文件的情况。那么有没有更专业的数据恢复软件可以找到更多误删数据&#xff1f;本文将为你介绍最值的推荐的15个数据蛙恢复软件替代产品。 丢失…

Web兼容性测试的要点

对于网页的兼容性我们主要考虑的是各种浏览器对前台页面的兼容性&#xff0c;因为浏览器对页面的影响是最大的。 现在浏览器的种类越来越多&#xff0c;网页中展现出来的内容也越来越丰富&#xff0c;这些内容包括网页中的字体、图片、动画等&#xff0c;而且有些内容需要网页…

AlexNet学习笔记(2)

里面 有些东西 对于现在来说都是错误的 而且由大量的细节对于现在来说没有必要 而且是过度的enginnering 一篇论文的第一段通常是讲一则故事 我们在做什么研究 哪个方向 有什么东西然后为什么很重要 正则化 regularization好像没有那么重要&#xff0c;并不是最关键的 最关键…

前端监控系统的搭建

UI自动化测试库 puppeteer&#xff1a;https://zhuanlan.zhihu.com/p/524254998 - google出品 cypress - 据说比puppeteer好用 前端监控体系 性能监控 异常监控 行为监控&#xff1a;埋点体系 主动监控 被动监控 前端性能数据捕获&#xff1a; 打点方式&#xff0c;结…

pyqt5 QPainter绘制图形,并旋转

PyQt5 的绘图系统使用户可以在屏幕或打印设备上用相同的 API 绘图&#xff0c;QPainter 是用来进行 绘图操作的类&#xff0c;一般的绘图设备包括 QWidget、QPixmap、QImage 等&#xff0c;这些绘图设备为 QPainter 提供了一个“画布” QWidget 类是所有界面组件的基类&#xf…

【细节注入模型】

Detail-Injection-Model-Inspired Deep Fusion Network for Pansharpening &#xff08;细节注入模型启发的深度融合网络全色锐化算法&#xff09; 全色锐化是一种图像融合方法&#xff0c;其目的是将低空间分辨率的多光谱&#xff08;MS&#xff09;图像与高空间分辨率的全色…

FFmpeg学习笔记--FFplay简单过滤器、FFmpeg命令参数

目录 1--FFplay简单过滤器 2--FFmpeg命令参数 2-1 主要参数 2-1-1 -i设定输入流 2-1-2 -f设定输出格式 2-1-3 -ss设定开始时间 2-1-4 -t设定时间长度 2-1-5 代码实例 2-2 音频参数 2-2-1 -aframes设置输出的音频帧数 2-2-2 -b:a设置音频码率 2-2-3 -ar设置音频采样…

SVG 安全

一 任务目标 本篇文章的目的有&#xff1a;[ ] 了解 SVG 漏洞[ ] 了解 SVG 常见防护手段[ ] 搜寻 SVG 数据安全性校验和过滤的库[ ] 了解如何使用此类库来进行 SVG 上传防护[ ] 阅读源码&#xff0c;能明确讲述此类库做了什么如果对你有所帮助&#xff0c;不妨点赞、评论、收藏…

Windows系统安装Git教程

今天给大家介绍Windows系统安装Git命令。 一、Git版本控制工具简介 git是一个开源的分布式版本控制系统。所谓版本控制系统&#xff0c;是开发者最重要的工具之一&#xff0c;可以有效的解决版本的同步以及不同开发者之家的通信问题&#xff0c;提升协同开发的效率。版本控制…

JAVA数据类型及转换

一、数据类型 数据类型字节数byte字节型占用1个字节取值范围&#xff1a;-27 ~ 27-1-128~127short短整型占用2个字节取值范围&#xff1a;-215 ~ 215-1-32768~32767&#xff0c;在实际开发中使用较少int整型占用4个字节取值范围&#xff1a;-231 ~ 231-1-2147483648-214748364…

大道至简:数据库的终极未来

墨天轮2022年12月份的报告已经出炉&#xff0c;这一期的主题是&#xff1a;大道至简&#xff0c;自治为王。在公众号回复&#xff1a;下载 可以获得各期报告下载链接。数据库的终极未来是什么&#xff1f;这是行业里一直在探讨的命题&#xff0c;复杂但是也简单。大道至简01Or…

硬盘无法格式化怎么解决?数据丢失怎么恢复?

有时遇到一些特殊情况&#xff0c;需要我们对电脑磁盘进行格式化。但是硬盘无法格式化&#xff0c;这时我们应该怎么进行操作&#xff1f;你可以根据下面的操作&#xff0c;通过磁盘的创建权限&#xff0c;或者通过磁盘管理来进行格式化&#xff0c;一起来看看下面的简单操作&a…

威固的MOM,你的WOW 「 WOW 手武之道」威固巅峰技术交流赛圆满收官

近日&#xff0c;由全球特种材料公司伊士曼旗下汽车膜品牌威固&#xff08;V-KOOL&#xff09;举办的2022威固WOW手武之道技术交流会&PK赛&#xff0c;顺利收官。来自各地服务商的多位技师光芒尽显&#xff0c;展示贴装艺术&#xff0c;分别赢得广州站、南京站、郑州站及成…

WEB应用安全测试丨Acunetix功能简介

快速查找并修复使您的Web应用程序面临攻击风险的漏洞。享受更多的安心——无需花费更多有限的时间。 产品功能 发现与爬行 01、发现所有需要扫描的东西 Acunetix会自动创建所有网站、应用程序和API的列表&#xff0c;并使其保持新状态。 这意味着您不会留下未扫描且容易受到…

00后少年的心力之作(已开源) | heartt(心力算法)

心力之作: 综合性极强的文本摘要算法: heartt&#xff08;心力算法&#xff09; 大家好&#xff0c;我是 heartt 算法的作者。我今年 13 岁&#xff0c;是一名热爱编程的学习者。 今天&#xff0c;我要向大家介绍我的新算法&#xff1a;heartt。 00后少年的心力之作(已开源) | …

xxx 拘留室项目

1.项目介绍 本项目用于当地拘留室&#xff0c;定位&#xff1a;监控、值班系统&#xff0c;项目时间&#xff1a;十二月。 系统涉及人员&#xff1a;值班人员、拘留人员 设备&#xff1a;摄像头&#xff08;海康&#xff09;、门禁&#xff08;中控安防&#xff09;、声光报警…

特色功能(锐捷云桌面篇)

大家好&#xff0c;我是小杜。转眼居家办公已经一周多了&#xff0c;有没有小伙伴和小杜一样&#xff0c;感觉还是在公司上班好&#xff0c;进入状态快呢。现在的主要精力是业务上&#xff0c;处理完对应业务后&#xff0c;就开始了“快乐”的学习了。还是相信之前纯粹的学习时…