RocketMQ从入门到精通

news2025/1/15 6:34:07

1.MQ概述


1.1 RocketMQ简介

 RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

1.2 MQ用途

  • 限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

  •  异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

  •  数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

1.3 常见MQ产品

  • RabbitMQ


RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是Java语言开发,所以公司内部对其实现定制化开发难度较大。

  • Kafka


Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐量,常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。

  • RocketMQ


RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双11的考验,性能与稳定性非常高。其没有遵循任何常见的MQ协议,而是使用自研协议。

对比

2.RocketMQ 基本概念


2.1 消息


消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。单个消息所占空间不会很大。

RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个MessageId(msgId),当消息到达Broker后,Broker也会自动生成一个MessageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识。 

msgId:由producer端生成,其生成规则为: producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器 
offsetMsgId:由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的偏移量) 
key:由用户指定的业务相关的唯一标识
 

2.2 主题


Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 一个生产者可以同时发送多种Topic的消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。 

2.3 标签


标签为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。 标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。 Topic是消息的一级分类,Tag是消息的二级分类。Topic相当于货物,Tag相当于上海山东等地区。

2.4 队列


存储消息的物理实体。 一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。 一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。 一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。

分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。

 2.5 Producer


消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。  例如:用户提交的请求写入到MQ的过程,就是消息生产的过程,在这里用户就是生产者 。


 RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。如果主题中有多个队列,生产者组只有一个生产者,生产者会采取轮询的方式进行发送消息。

生产者代码如下:导入依赖

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

生产者代码

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer order = new DefaultMQProducer("order");
        order.setNamesrvAddr("localhost:9876");
        order.start();
        Message message = new Message("myTopic", "myTag", ("test").getBytes());
        SendResult result = order.send(message);
        System.out.println(result);
        order.shutdown();
    }


2.6 Consumer


消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。  例如:系统从MQ中读取到请求,并对请求进行处理的过程就是消息消费的过程,在这里系统就是消费者。 
 RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。 消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着消费原Consumer消费的Queue)的目标变得非常容易。

消费者代码

public static void main(String[] args) throws MQClientException {
 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("myTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("收到的消息"+list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
 
    }

负载均衡策略

queue 个数大于 Consumer个数, 那么 Consumer 会平均分配 queue,不够平均,会根据clientId排序来拿取余数
queue个数小于Consumer个数,那么会有Consumer闲置,就是浪费掉了,其余Consumer平均分配到queue

消费者组中Consumer的数量应该小于等于订阅Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

2.7 NameServer


 NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。 
主要包括两个功能: 
Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。

路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。 


路由注册 
Name Server既然是注册中心,那么是如何完成注册的呢? NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。 那各节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护着⼀个Broker列表,用来动态存储Broker的信息。 
 
Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。 


路由剔除 
由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。 NameServer中有⼀个定时任务,每隔10秒就会扫描⼀次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除。 


路由发现 
RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取Topic最新的路由。 默认客户端每30秒会拉取一次最新的路由。
 

2.8 Broker


Broker充当着消息中转角色,负责存储消息、转发消息。
Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。

模块如下图:

Remoting Module:整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。

Client Manager:客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如,维护Consumer的Topic订阅信息

Store Service:存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。

HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

Index Service:索引服务。根据特定的Message key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能。

2.9 RocketMQ 工作流程


工作流程如下图:

1)启动NameServer,NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。


2)启动Broker时,Broker会与所有的NameServer建立并保持长连接,然后每50秒向NameServer定时发送心跳包。


3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建Topic。


4) Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息。


5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳,以确保Broker的存活状态。
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1324707.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

stack刷题

最小栈 最小栈 设计一个支持 push &#xff0c;pop &#xff0c;top 操作&#xff0c;并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈顶部…

Go 语言中并发的威力

发挥效率和响应能力 并发是现代软件开发中的一个基本概念&#xff0c;它使程序能够同时执行多个任务&#xff0c;提高效率和响应能力。在本文中&#xff0c;我们将探讨并发在现代软件开发中的重要性&#xff0c;并深入了解 Go 处理并发任务的独特方法。 在现代软件开发中并发…

C++共享和保护——(5)编译预处理命令

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 耕耘者的汗水是哺育种子成长的乳汁&am…

无约束优化问题求解笔记(1)

目录 1. 迭代求解的基本流程与停止准则1.1 迭代求解的基本流程1.2 停止准则1.3 收敛阶 2. 线搜索方法2.1 精确线搜索2.2 非精确搜索**Goldstein 准则****Wolfe 准则** 2.3 线搜索算法的收敛性 1. 迭代求解的基本流程与停止准则 1.1 迭代求解的基本流程 优化问题的解通常无法直…

智慧食堂餐卡充值文件生成器使用说明

智慧食堂餐卡充值文件生成器 下载地址&#xff1a; https://download.csdn.net/download/boysoft2002/88646277 或者百度网盘下载&#xff1a; https://pan.baidu.com/s/16cxOa5aq0CU0T0xOr2A7-A 操作使用说明 一、文件结构 下载.rar文件后&#xff0c;释放到非系统盘符的…

Labview Vision 机器视觉使用,从下载程序安装应用,到实战找硬币并输出值

1.前言 大家好,今天我要和机器人一起配合来打算 做机器视觉 用Labview 和 Vision 联动实现机器的视觉 2.下载软件-软件的安装 我们除了基础款的labview软件 还要安装视觉四件套 1.Labview 编程平台&#xff08;我是 2023 q3&#xff09; 2. NI - IMAQdx &#xff08;驱动软…

【python】程序运行添加命令行参数argparse模块用法详解

Python标准库之argparse&#xff0c;详解如何创建一个ArgumentParser对象及使用 一. argparse介绍二. 使用步骤及参数介绍三. 具体使用3.1 设置必需参数3.2 传一个参数3.3 传多个参数3.4 位置参数和可选参数3.5 参数设置默认值3.6 其它用法 一. argparse介绍 很多时候&#xff…

Python数据处理必备:Pandas DataFrame中行迭代技巧大曝光!

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在数据分析和处理中&#xff0c;Pandas是Python中最常用的库之一&#xff0c;而DataFrame是Pandas的核心数据结构之一。迭代DataFrame中的行是一种常见的操作&#xff0c;本文将详细介绍几种迭代DataFrame行的方…

AI毕业设计生成器(可生成java或python系统源码),使用Tensorflow训练的AI代码大模型

这是一个辅助生成计算机毕业设计的工具&#xff0c;可以自动完成毕业设计的源码。它基于几百个github上面开源的java和python项目&#xff0c;运用tengsorflow技术&#xff0c;训练出了AI大模型。基本实现了计算机毕业设计生成器&#xff0c;能够初步生成Java或python基本源码。…

暴雪来袭!AI智能视频监控如何保障雪天出行

随着冬季的寒潮来袭&#xff0c;多地也发出了暴雪预警&#xff0c;低温严寒加上暴雪突袭&#xff0c;AI智能视频监控如何保障雪天正常出行呢&#xff1f;小编罗列了如下几条&#xff1a; 1、道路监控系统 安防管理平台EasyCVR道路智能监控方案是通过摄像头和传感器监测道路状况…

翻译: LLMs关于人工智能的担忧 Concerns about AI

在短时间内&#xff0c;获取生成人工智能的能力已经在全球范围内传播&#xff0c;使许多人能够生成高质量的文章、图片和音频。随着这些惊人的能力的出现&#xff0c;也带来了许多关于人工智能的担忧。我认为即使在生成人工智能兴起之前&#xff0c;我们就已经生活在许多焦虑之…

AI芯片、GPU、算力、大模型

人工智能&#xff08;Artificial Intelligence&#xff09; AIGC&#xff08;Artificial Intelligence Generated Content / AI-Generated Content&#xff09; 中文译为人工智能生成内容&#xff0c;一般认为是相对于PCG&#xff08;专业生成内容&#xff09;、UCG&#xff0…

我在代码随想录|写代码Day6之 454.四数相加II ,三数之和

第一题: 454.四数相加II 题目 解答思路 我们要四数相加等于0一般的思路是暴力破解直接4个for循环,然后通过4个for循环得到答案, 但是这样的时间复杂度是O(n4),会超时然后我们通过将循环拆分,比如将 代码 第二题 : 题目: 代码 class Solution { public:vector<vector<…

【教3妹学编程-算法题】循环移位后的矩阵相似检查

插&#xff1a; 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 坚持不懈&#xff0c;越努力越幸运&#xff0c;大家一起学习鸭~~~ 3妹&#xff1a;“太阳当空照&#xff0c;花儿对我笑&…

Playground AI刚刚推出了它的新宠儿——Playground V2,去试试?

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

PyQt6 QFontDialog字体对话框控件

锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计50条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话版…

基于Antd4 和React-hooks的项目开发

基于Antd4 和React-hooks的项目开发 https://github.com/dL-hx/react-cnode 项目依赖使用 react 16.13react-redux 7.xreact-router-dom 5.xredux 4.xantd 4axiosmoment 2.24 (日期格式化)qs 项目视图说明 首页主题详情用户列表用户详情关于 配置按需加载 https://3x.an…

算法训练营Day19

#Java #二叉树 #双指针 开源学习资料 Feeling and experiences&#xff1a; 二叉搜索树的最小绝对差&#xff1a;力扣题目链接 给你一个二叉搜索树的根节点 root &#xff0c;返回 树中任意两不同节点值之间的最小差值 。 差值是一个正数&#xff0c;其数值等于两值之差的…

使用Matlab实现声音信号处理

利用Matlab软件对声音信号进行读取、放音、存储 先去下载一个声音文件&#xff1b;使用这个代码即可 clear; clc; [y, Fs] audioread(xxx.wav); plot(y); y y(:, 1); spectrogram(y); sound(y, Fs); % player audioplayer(y, Fs);y1 diff(y(:, 1)); subplot(2, 1, 1); pl…

107基于matlab的模糊推理系统(ANFIS)的时间序列预测

基于matlab的模糊推理系统&#xff08;ANFIS&#xff09;的时间序列预测&#xff0c;输出训练集、测试集和预测数据结果&#xff0c;数据可更换自己的&#xff0c;程序已调通&#xff0c;可直接运行。 107 时间序列预测模糊推理系统 (xiaohongshu.com)