Consumer的负载均衡

news2024/11/28 8:36:07

        想要提高Consumer的处理速度,可以启动多个Consumer并发处理,这个时候就涉及如何在多个Consumer之间负载均衡的问题,接下来结合源码分析Consumer的负载均衡实现。

要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个Consumer,知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。

1.DefaultMQPushConsumer的负载均衡

DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发doRebalance动作。

如图7-2所示,具体的负载均衡算法有五种,默认用的是第一种AllocateMessageQueueAveragely。负载均衡的结果与Topic的Message Queue数量,以及ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系影响负载均衡结果。

图7-2 RocketMQ客户端负载均衡策略

以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。

2.DefaultMQPullConsumer的负载均衡

Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。

DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener函数,如代码清单7-5所示。

代码清单7-5 registerMessageQueueListener

Consumer.registerMessageQueueListener("TOPICNAME", new MessageQueue-Listener() {
public void MessageQueueChanged(String Topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) }

 

registerMessageQueueListener函数在有新的Consumer加入或退出时被触发。另一个辅助工具是MQPullConsumerScheduleService类,使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者,如代码清单7-6所示。

代码清单7-6 使用MQPullConsumerScheduleService示例

public class PullConsumerServiceTest {
    public static void main(String[] args) throws MQClientException {
        final MQPullConsumerScheduleService scheduleService = new MQPull-ConsumerScheduleService("PullConsumerService1");
        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("localh-ost:9876");
        scheduleService.setMessageModel(MessageModel.CLUSTERING );
        scheduleService.registerPullTaskCallback("testPullConsumer", new PullTaskCallback() {
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer Consumer = context.getPullConsumer();
                try {
                    long Offset = Consumer.fetchConsumeOffset(mq, false);
                    if (Offset < 0)
                        Offset = 0;
                    PullResult pullResult = Consumer.pull(mq, "*", Offset, 32);
                    System.out.printf("%s%n", Offset + "\t" + mq + "\t" + pullResult);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    Consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    context.setPullNextDelayTimeMillis(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        scheduleService.start();
    }
}

 

然后我们看一看在MQPullConsumerScheduleService类的实现里,实现负载均衡的代码,如代码清单7-7所示。

代码清单7-7 MQPullConsumerScheduleService的负载均衡实现

class MessageQueueListenerImpl implements MessageQueueListener {
    @Override
    public void MessageQueueChanged(String Topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageModel MessageModel =
            MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
        switch (MessageModel) {
            case BROADCASTING:
                MQPullConsumerScheduleService.this.putTask(Topic, mqAll);
                break;
            case CLUSTERING :
                MQPullConsumerScheduleService.this.putTask(Topic, mqDivided);
                break;
            default:
                break;
        }
    }
}

 

从源码中可以看出,用户通过更改MessageQueueListenerImpl的实现来做自己的负载均衡策略。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1218648.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM-HotSpot虚拟机对象探秘

目录 一、对象的实例化 &#xff08;一&#xff09;创建对象的方式 &#xff08;二&#xff09;创建对象的步骤 二、对象的内存布局 &#xff08;一&#xff09;对象头 &#xff08;二&#xff09;实例数据 &#xff08;三&#xff09;对齐填充 三、 对象的访问定位 &…

绕过类安全问题分析方法

什么是绕过 逻辑漏洞是指程序设计中逻辑不严密&#xff0c;使攻击者能篡改、绕过或中断程序&#xff0c;令其偏离开发人员预期的执行。 常见表现形式 1、接口&#xff08;功能类&#xff09;绕过&#xff1a;即接口或功能中通过某参数&#xff0c;绕过程序校验 2、流程类绕…

c++虚函数纯虚函数详解加代码解释

c虚函数纯虚函数详解加代码解释 一.概念&#xff1a;二.虚函数示例及解析&#xff1a;三.纯虚函数示例及解析&#xff1a;四.验证和实际使用及解析&#xff1a;1.子类没有对父类的函数重载&#xff0c;mian()函数调用&#xff0c;是直接返回父类的值2.子类对父类的函数重载&…

一键清除磁盘垃圾工具分享,绿色快速好用

下载&#xff1a;https://download.csdn.net/download/weixin_43097956/88541564

2023年中国机动车拍卖网络化趋势加速,网络拍卖专场数量大幅上升至47489场[图]

2022年&#xff0c;由于机动车拍卖网络化趋势继续加速&#xff0c;网络拍卖专场数量大幅上升&#xff0c;全国机动车专场拍卖会高达59450场&#xff0c;较上年攀升125.31%。在389家拍卖企业中&#xff0c;举办场次超过100场的企业有27家&#xff0c;合计54850场&#xff0c;占比…

2023年中国骨质疏松治疗仪发展趋势分析:小型且智能将成为产品优化方向[图]

骨质疏松治疗仪利用磁场镇静止痛、消肿消炎的治疗作用迅速缓解患者腰背疼痛等骨质疏松临床症状。同时利用磁场的磁-电效应产生的感生电势和感生电流&#xff0c;改善骨的代谢和骨重建&#xff0c;通过抑制破骨细胞、促进成骨细胞的活性来阻止骨量丢失、提高骨密度。 骨质疏松治…

有能一键批量转换,轻松将PDF、图片转为Word/Excel的软件吗?

随着数字化时代的到来&#xff0c;OCR技术在我们的生活中变得越来越重要。无论是从图片中提取文字&#xff0c;还是将PDF、图片格式的文件转换为Word或Excel格式&#xff0c;OCR软件都能够为我们提供极大的便利。然而&#xff0c;市面上的OCR软件种类繁多&#xff0c;哪一款软件…

蓝桥杯第三周算法竞赛D题E题

发现更多计算机知识&#xff0c;欢迎访问Cr不是铬的个人网站 D迷宫逃脱 拿到题目一眼应该就能看出是可以用动态规划来解决。但是怎么定义dp呢? 这个题增加难度的点就在当所在位置与下一个要去的位置互质的时候&#xff0c;会消耗一把钥匙。当没有钥匙的时候就不能移动了。想…

802.11ax-2021协议学习__$27-HE-PHY__$27.5-Parameters-for-HE-MCSs

802.11ax-2021协议学习__$27-HE-PHY__$27.5-Parameters-for-HE-MCSs 27.3.7 Modulation and coding scheme (HE-MCSs)27.3.8 HE-SIG-B modulation and coding schemes (HE-SIG-B-MCSs)27.5 Parameters for HE-MCSs27.5.1 General27.5.2 HE-MCSs for 26-tone RU27.5.3 HE-MCSs f…

AE (1)_软件、硬件、驱动控制

#灵感# AE是个值得推敲再推敲的模块&#xff0c;有意思。 目录 相关的硬件-光圈&#xff1a; 相关的软件-曝光-ISO&#xff1a; ISP中的sensor AE 组成&#xff1a; sensor AE的流程及控制&#xff1a; 相关的硬件-光圈&#xff1a; 光圈&#xff08;F-Number&#xff0…

Equifax案例分析与合规性场景实践

在当今数字化时代&#xff0c;数据安全已经成为各个组织和企业亟待解决的问题。尤其是在数据泄露事件不断增多的背景下&#xff0c;保护敏感数据免受非法访问和泄露变得尤为紧迫。为了应对这一挑战&#xff0c;许多组织和企业开始利用密钥管理服务(KMS)来加强其数据安全性&…

KVM网络环境下vlan和trunk的理解

vmware exsi 平台&#xff0c;虚拟交换机管理界面的上行链路是什么意思 VMware ESXi中的虚拟交换机管理界面中的“上行链路”&#xff08;uplinks&#xff09;是指虚拟交换机连接到物理网络的物理网络适配器。在ESXi中&#xff0c;虚拟交换机&#xff08;vSwitch&#xff09;用…

Ubuntu18.04安装ROS系统+turtle测试

安装 1.设置安装源 sudo sh -c echo "deb http://packages.ros.org/ros/ubuntu $(lsb_release -sc) main" > /etc/apt/sources.list.d/ros-latest.list sudo sh -c . /etc/lsb-release && echo "deb http://mirrors.tuna.tsinghua.edu.cn/ros/ubun…

4.1 Windows驱动开发:内核中进程与句柄互转

在内核开发中&#xff0c;经常需要进行进程和句柄之间的互相转换。进程通常由一个唯一的进程标识符&#xff08;PID&#xff09;来标识&#xff0c;而句柄是指对内核对象的引用。在Windows内核中&#xff0c;EProcess结构表示一个进程&#xff0c;而HANDLE是一个句柄。 为了实…

不允许你还不了解指针的那些事(二)(从入门到精通看这一篇就够了)(数组传参的本质+冒泡排序+数组指针+指针数组)

目录 数组名的理解 使用指针访问数组 一维数组传参的本质 冒泡排序 二级指针 指针数组 指针数组模拟二维数组 字符指针变量 数组指针变量 二维数组传参的本质 函数指针变量 函数指针变量的创建 函数指针变量的使用 两段有趣的代码 代码一 代码二 typedef关键字 函数指针数组 …

Saas+AI?这可能是2023年最精华的6篇文章

‍ 原文太长&#xff08;6篇总计40200字&#xff09;&#xff0c;我提炼出核心要点&#xff0c;并打散重组&#xff0c;最后总计仅4500字&#xff0c;不仅是节省了大家时间&#xff0c;还能带来更多不一样的视角解读。 文章一、「AI与SaaS结合的三部曲」 &#xff08;引自8月25…

计及源荷不确定性的综合能源生产单元运行调度与容量配置随机优化模型MATLAB

主要内容 本程序复现《计及源荷不确定性的综合能源生产单元运行调度与容量配置两阶段随机优化》模型&#xff0c;采用全年光伏、风电数据通过kmeans聚类得到6种场景&#xff0c;构建了随机优化模型&#xff0c;在研究融合P2G与CCS的IEPU系统框架基础上&#xff0c;建立了各关键…

JWT登录认证(3拦截器)

Jwt登录认证&#xff08;拦截器&#xff09;&#xff1a; 使用拦截器统一验证令牌 登录和注册接口需要放行 interceptors.LoginInterceptor&#xff1a;&#xff08;注册一个拦截器&#xff09; package com.lin.springboot01.interceptors;import com.lin.springboot01.pojo.…

设计模式-中介者模式-笔记

Medicator中介者模式 动机&#xff08;Motivation&#xff09; 在软件构建过程中&#xff0c;经常会出现多个对象相互关联交际的情况&#xff0c;对象之间常常会维持一种复杂的引用关系&#xff0c;如果遇到一些需求的更改&#xff0c;这种直接的引用关系将面临不断的变化。 …

电脑监控软,电脑屏幕监控软件

电脑监控软&#xff0c;电脑屏幕监控软件 电脑屏幕监控软件不仅仅是一种工具&#xff0c;更是一种守护。随着互联网的发展&#xff0c;我们工作越来越离不开电脑&#xff0c;但同时&#xff0c;也面临着更多的安全隐患。为了保护个人隐私&#xff0c;提高工作效率&#xff0c;…