分布式消息队列RocketMQ概念详解

news2025/1/11 5:57:11

目录

1.MQ概述

1.1 RocketMQ简介

1.2 MQ用途

1.3 常见MQ产品

2.RocketMQ 基本概念

2.1 消息

2.2 主题

2.3 标签

2.4 队列

 2.5 Producer

2.6 Consumer

2.7 NameServer

2.8 Broker

2.9 RocketMQ 工作流程


1.MQ概述

1.1 RocketMQ简介

RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

1.2 MQ用途

限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

 异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

 数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

1.3 常见MQ产品

RabbitMQ
RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是Java语言开发,所以公司内部对其实现定制化开发难度较大。
Kafka
Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐量,常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。
RocketMQ
RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验,性能与稳定性非常高。其没有遵循任何常见的MQ协议,而是使用自研协议。

对比


 

2.RocketMQ 基本概念

2.1 消息

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。单个消息所占空间不会很大。

RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。 

msgId:由producer端生成,其生成规则为: producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器 
offsetMsgId:由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的偏移量) 
key:由用户指定的业务相关的唯一标识
 

2.2 主题

Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 一个生产者可以同时发送多种Topic的消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。 

2.3 标签

标签为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。 标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 Topic是消息的一级分类,Tag是消息的二级分类。Topic相当于货物,Tag相当于上海山东等地区。

2.4 队列

存储消息的物理实体。 一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。 一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。 一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。
 

分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。

 2.5 Producer

消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。  例如:用户提交的请求写入到MQ的过程,就是消息生产的过程,在这里用户就是生产者 。


 RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。如果主题中有多个队列,生产者组只有一个生产者,生产者会采取轮询的方式进行发送消息。

生产者代码如下:

导入依赖

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

 生产者代码

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer order = new DefaultMQProducer("order");
        order.setNamesrvAddr("localhost:9876");
        order.start();
        Message message = new Message("myTopic", "myTag", ("test").getBytes());
        SendResult result = order.send(message);
        System.out.println(result);
        order.shutdown();
    }

2.6 Consumer

消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。  例如:系统从MQ中读取到请求,并对请求进行处理的过程就是消息消费的过程,在这里系统就是消费者。 
 
RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。 消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着消费原Consumer消费的Queue)的目标变得非常容易。

消费者代码

  public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("myTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("收到的消息"+list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

    }


 

负载均衡策略

queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数
queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue

消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

2.7 NameServer

 NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。 
主要包括两个功能: 
Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。

路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。 


路由注册 
Name Server既然是注册中心,那么是如何完成注册的呢? NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。 那各节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着⼀个Broker列表,用来动态存储Broker的信息。 
 
Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。 


路由剔除 
由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。 NameServer中有⼀个定时任务,每隔10秒就会扫描⼀次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。 


路由发现 
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取Topic最新的路由。 默认客户端每30秒会拉取一次最新的路由。
 

2.8 Broker

Broker充当着消息中转角色,负责存储消息、转发消息。
Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。

模块如下图:

Remoting Module:整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。

Client Manager:客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如,维护Consumer的Topic订阅信息

Store Service:存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。

HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

Index Service:索引服务。根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能。

2.9 RocketMQ 工作流程

工作流程如下图:

1)启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。


2)启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每50秒向NameServer定时发送心跳包。


3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。


4) Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息。


5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/513804.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云原生:从基本概念到实践,解析演进与现状

文章目录 云原生&#xff1a;从基本概念到实践&#xff0c;解析演进与现状概念演进之路DockerKubernetesCloud NativeServerless 业界现状总结 结语 云原生&#xff1a;从基本概念到实践&#xff0c;解析演进与现状 本文仅用于简单普及&#xff0c;达到的目的是给没接触过或者很…

苹果手机无法开机?黑屏打不开怎么办?出现这种问题的解决办法分享!

各位在使用苹果手机的小伙伴有没有遇到苹果手机突然就黑屏开不了机&#xff0c;打电话也没有任何反应&#xff0c;手机也无法关机重启&#xff0c;这是什么问题呢&#xff1f;我们遇到这种问题该如何去处理呢&#xff1f; 小编今天就来跟大家说说苹果手机突然开不了机的原因以及…

【Linux命令】脚本里常用的几个命令

脚本里常用的命令 一、SORT命令1.1、语法格式1.2常用选项 二、uniq命令2.1命令格式2.2常用选项2.3小实验&#xff0c;过滤出现三次以上的IP地址 三、tr命令3.1语法格式3.2常用选项3.3实验 四、cut命令4.1语法格式4.2常用选项 五、split命令5.1语法格式5.2常用选项 六、eval七、…

在行 | “数智”为离散制造发展注入动能

在行业现场解析行业难题&#xff0c; 用主题方案创新数智价值。 制造业作为我国实体经济的主体&#xff0c;是国民经济体系的重要组成部分&#xff0c;其中以离散制造比重最大&#xff0c;是解决就业等民生问题的支柱。随着技术和经济水平的提升&#xff0c;市场对离散制造行业…

CnOpenData淘宝村淘宝镇名单数据

一、数据简介 随着电商的迅猛发展&#xff0c;以淘宝村为代表的新型城镇化不断推进。淘宝镇和淘宝村是电商巨头阿里巴巴推出的一系列支持中小企业、新创企业发展的计划&#xff0c;旨在为中小企业及创新企业提供融资、营销、培训、咨询等服务。截至2022年&#xff0c;全国涌现了…

创新案例 |探索 Tive 80% 的收入增长得益于智能物流服务、跟踪和实时可视化

您正在寻找可靠的物流解决方案吗&#xff1f; Tive 是领先的智能物流服务提供商&#xff0c;提供跟踪和实时可见性解决方案。使用 Tive&#xff0c;您可以主动监控公路、空运、海运和铁路运输。它可以帮助您减少运输问题并确保准时和全面交付&#xff0c;从而改善客户体验。 …

融合CDN和单CDN的产品对比

仅针对特定地理位置的公司可以使用单一CDN解决方案&#xff0c;建议网站内容在全球分发的优先选择融合CDN来进行加速。 如果您的网站内容/应用程序大多是静态的&#xff0c;那么单一CDN解决方案可能适合大多数市场需求&#xff1b;但如果您的流量高于平均水平&#xff0c;媒体流…

【表面缺陷检测】基于yolov5的钢板表面缺陷检测(附代码和数据集,Windows系统)

写在前面: 首先感谢兄弟们的订阅,让我有创作的动力,在创作过程我会尽最大能力,保证作品的质量,如果有问题,可以私信我,让我们携手共进,共创辉煌。 路虽远,行则将至;事虽难,做则必成。只要有愚公移山的志气、滴水穿石的毅力,脚踏实地,埋头苦干,积跬步以至千里,就…

iPhone语音备忘录删除了怎么恢复?恢复备忘录,只需3个方法!

案例&#xff1a;语音备忘录被清空 【苹果语音备忘录有我很多会议记录&#xff0c;但是被我清理手机垃圾的时候顺便清理了。有什么方法恢复回来吗&#xff1f;】 很多人都知道&#xff0c;iphone语音备忘录是使用起来非常方便的一种记录方式&#xff0c;但是如何在不小心删除备…

Python之引用

1. 引用简介与工具引入 Python 中对于变量的处理与 C 语言有着很大的不同&#xff0c;Python 中的变量具有一个特殊的属性&#xff1a;identity&#xff0c;即“身份标识”。这种特殊的属性也在很多地方被称为“引用”。 为了更加清晰地说明引用相关的问题&#xff0c;我们首…

MySQL---多表联合查询(下)(内连接查询、外连接查询、子查询(ALL/ANY/SOME/IN/EXISTS关键字)、自关联查询)

1. 内连接查询 数据准备&#xff1a; use mydb3;-- 创建部门表 create table if not exists dept3(deptno varchar(20) primary key , -- 部门号name varchar(20) -- 部门名字 );-- 创建员工表 create table if not exists emp3(eid varchar(20) primary key , -- 员工编号e…

代表Java未来的ZGC深度剖析

JAVA程序最爽的地方是它的GC机制&#xff0c;开发人员不需要关注内存申请和回收问题。同时&#xff0c;JAVA程序最头疼的地方也是它的GC机制&#xff0c;因为掌握JVM和GC调优是一件非常困难的事情。在ParallelOldGC、CMS、G1之后&#xff0c;JDK11带来的全新的「ZGC」为我们解决…

css中常用伪类表单验证:invalid、:valid、:required、以及:not 、:lang、:empty的使用

MDN文档关于伪类的相关介绍 1、 :invalid :invalid 是 CSS 伪类选择器&#xff0c;用来选择任何未通过验证的 <form>、<fieldset>、<input> 或其他表单元素。 <form class"form"><label for"email">邮箱地址:</label>…

Sqlite3 生成lib库文件

特此记录&#xff01; QT使用SQL一般有两种方式 No1&#xff0c;使用Qt内部的Sql模块 No2&#xff0c;不通过Qt的Sql模块&#xff0c;直接使用Sqlite的lib库&#xff0c;使用Sqlite的标准C/C接口就行 接下来主要针对第二种。 第一步&#xff0c;进入官网 SQLite Download P…

Google Play应用广告该如何运作

Google 应用广告是一种付费广告渠道&#xff0c;可以帮助我们把应用推向特定的目标受众。比如可以使用应用安装广告&#xff0c;用来吸引用户安装我们的应用&#xff0c;我们可以选择手动设置出价和定位&#xff0c;或使用 Google Ads 自动设置目标和出价。 Google 在创建和投…

unity3D 魔兽争霸游戏开发案例教程

文章连载更新中&#xff0c;可以提前领取素材进行预习&#xff0c;自学 素材领取&#xff1a;私信发送 领取RPG网络开发教材 这里写目录标题 游戏玩法这门课适合哪些人学习学完了能达到什么效果项目准备基础系统战斗系统同步设计精讲社交系统副本系统优化项目准备正文美术准备&…

给k8s集群添加负载均衡的能力

常识: k8s没有自带负载均衡能力, 需云服务提供商来做负载均衡, 或者自己装负载均衡控制器. 负载均衡控制器有很多, 这次装Ingress-Nginx https://kubernetes.github.io/ingress-nginx/ 文档里根据环境有很多安装方式,不要用quick start的,因为那是云环境下的. 我们的k8s是自己的…

微信小程序商品分类页最佳实践

首先我们来分析下UI小妹发来的产品原型图&#xff1a; 微信小程序商品分类页需要实现 1.单击左边的商品类目&#xff0c;右侧实现联动跳转到对应商品类目标题&#xff1b; 2.触屏拖动右侧商品列表&#xff0c;右侧跳转到对应商品类目&#xff1b; 2.分析需求我们可以把屏幕分…

使用阿里云服务器三分钟搭建网站教程(详细图文详解)

使用阿里云服务器快速搭建网站教程&#xff0c;先为云服务器安装宝塔面板&#xff0c;然后在宝塔面板上新建站点&#xff0c;阿里云服务器网以搭建WordPress网站博客为例&#xff0c;来详细说下从阿里云服务器CPU内存配置选择、Web环境、域名解析到网站上线全流程&#xff1a; …

5月编程排行榜出炉,最佳编程语言是谁?

技术的发展日新月异&#xff0c;作为开发者&#xff0c;应该时刻关注这些变化&#xff0c;不断学习才能跟上时代步伐。 编程语言层出不穷&#xff0c;关于“ 最佳编程语言 ”的争论也从未停止&#xff0c;网友们各抒己见...... 网友A&#xff1a; 人生苦短&#xff0c;我选Pyt…