20241130 RocketMQ本机安装与SpringBoot整合

news2025/1/2 2:53:49

目录

一、RocketMQ简介

???1.1、核心概念

???1.2、应用场景

???1.3、架构设计

2、RocketMQ Server安装

3、RocketMQ可视化控制台安装与使用

4、SpringBoot整合RocketMQ实现消息发送和接收?

? ? ? ? 4.1、添加maven依赖

???4.2、yaml配置

???4.3、生产者

???4.4、消费者

???4.5、接口

???4.6、接口测试


一、RocketMQ****简介

RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历 了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

官方文档:https://rocketmq.apache.org/docs/quick-start/

github中文主页:https://github.com/apache/rocketmq/tree/master/docs/cn

1.1、核心概念

  1. Topic:消息主题,一级消息类型,生产者向其发送消息。Message:生产者向Topic发送并最终传送给消费者的数据消息的载体。
  2. 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
  3. Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。
  4. Message ID:消息的全局唯一标识,由消息队列RocketMQ系统自动生成,唯一标识某条消息。
  5. Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
  6. Producer:也称为消息发布者,负责生产并发送消息至Topic。
  7. Consumer:也称为消息订阅者,负责从Topic接收并消费消息。
  8. 分区:即Topic Partition,物理上的概念。每个Topic包含一个或多个分区。
  9. 消费位点:每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset;分区的起始位置对应的位置叫做起始位点MinOffset。
  10. Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订 阅的逻辑一致。
  11. Group ID:Group的标识。
  12. 队列:单个Topic下会由一到多个队列来存储消息。
  13. Exactly-Once****投递语义:Exactly-Once投递语义是指发送到消息系统的消息只能被Consumer处理且仅处理一次,即使Producer重试消息发送导致某消息重复投递,该消息在Consumer也只被消费 一次。
  14. 集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条 消息。
  15. 广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
  16. 定时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
  17. 延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
  18. 事务消息:RocketMQ提供类似X/Open XA的分布事务功能,通过消息队列RocketMQ的事务消息能达到分布式事务的最终一致。
  19. 顺序消息:RocketMQ提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序消息。
  20. 全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
  21. 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。
  22. 消息堆积:Producer已经将消息发送到消息队列RocketMQ的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ的服务端保存着未被消费的消息,该状态即消息堆积。
  23. 消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ的服务端完成。
  24. 消息轨迹:在一条消息从Producer发出到Consumer消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从Producer发出,经由消息队列RocketMQ服务端,投递给Consumer的完整链路,方便定位排查问题。
  25. 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ服务端的消息。
  26. 死信队列:死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中。消息队RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

1.2、应用场景

  1. 削峰填谷:诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题。
  2. 异步解耦:交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性。
  3. 顺序收发:细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ提供的顺序消息即保证消息FIFO。
  4. 分布式事务一致性:交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
  5. 大数据分析:数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
  6. 分布式缓存同步:天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化。

1.3、架构设计

RocketMQ架构上主要分为四部分,如上图所示:

**Producer:**消息发布的角色,支持分布式集群方式部署。 Producer 通过 MQ 的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

**Consumer:**消息消费的角色,支持分布式集群方式部署。支持以 push 推, pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

NameServer: NameServer 是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的zookeeper,支持 Broker 的动态注册与发现。主要包括两个功能: Broker 管理, NameServer 接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker仍然可以向其它 NameServer 同步其路由信息, Producer,Consumer仍然可以动态感知

Broker的路由的信息。BrokerServer: Broker 主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

**1. Remoting Module:**整个Broker 的实体,负责处理来自 clients端的请求。

**2. Client Manager:**负责管理客户端(Producer/Consumer) 和维护 Consumer 的 Topic订阅信息

**3. Store Service:**提供方便简单的API接口处理消息存储到物理硬盘和查询功能。

4. HA Service: 高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

5. Index Service: 根据特定的Message key 对投递到 Broker的消息进行索引服务,以提供消息的快速查询。

2、RocketMQ Server****安装

RocketMQ依赖Java环境,要求有JDK 1.8以上版本;

支持Windows和Linux平台;支持源码方式安装和使用已经编译好的安装包安装;

我们用windows平台安装RocketMQ Server编译好的安装包,来讲解RocketMQ;

下载地址:https://rocketmq.apache.org/dowloading/releases/

我们选择4.9.0二进制版本;

解压后的目录:

benchmark:里面是测试Demo;

bin:可执行脚本;conf:配置文件;

lib:依赖的jar包;

我们把rocketmq解压包放到D盘soft目录下,重命名 rocketmq ;

第一步:系统环境变量加两个配置

ROCKETMQ_HOME=“D:soft ocketmq”

NAMESRV_ADDR=“localhost:9876”

第二步:启动 Name Server

进入命令行执行:

.inmqnamesrv.cmd

第三步:启动 Broker

进入命令行执行:

.inmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

第四步:发送和接收消息测试

进入命令行消息发送执行:

.in ools.cmd org.apache.rocketmq.example.quickstart.Producer

消息发送成功;

进入命令行消息接收执行:

.in ools.cmd org.apache.rocketmq.example.quickstart.Consumer

消息接收成功;

第五步:关闭服务

windows 下直接关闭命令行窗口即可;

3、RocketMQ****可视化控制台安装与使用

RocketMQ 提供了一些扩展项目支持,地址: https://github.com/apache/rocketmq-externals

其中一个 rocketmq - connect - console 项目,就是我们需要的可视化控制台;

我们把整个项目下载下来,打开 rocketmq - console 项目;项目是 SpringBoot 开发;

打开 application.properties 配置文件,我们至少需要修改两个配置项;

server.port=8080 ,这个是可视化项目启动端口,我们改成 19876 ;

rocketmq.config.namesrvAddr= ,这个是指定 nameServer 地址和端口,我们暂时先搞成localhost:9876,等后面搞集群的话,要再修改;

修改后保存,在项目目录下进入命令行,执行:

D:soft ocketmqClient ocketmq-externals-master ocketmq-console

mvn clean package -Dmaven.test.skip=true

打包执行完后,在 target 目录,会生成一个可运行 jar rocketmq - console - ng - 2.0.0.jar

我们运行这个 jar ,进入命令行执行:

java -jar rocketmq-console-ng-2.0.0.jar

启动成功后,浏览器输入:http://localhost:19876/

说明一切OK;

4、SpringBoot整合RocketMQ****实现消息发送和接收

4.1、添加maven依赖

org.apache.rocketmq rocketmq-spring-boot-starter 2.2.3

4.2、yaml配置

rocketmq:
name-server: localhost:9876
producer:
# 发送同一类消息的设置为同一个group,保证唯一
group: producer-group
# 发送消息超时时间,默认3000
sendMessageTimeout: 5000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
consumer:
group: consumer-group # 消费者组名

4.3、生产者

@Service
public class RocketMQProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
        System.out.println("Message sent: " + message);
    }
}

4.4、消费者

@Service
@RocketMQMessageListener(topic = "your-topic", consumerGroup = "consumer-group")
public class RocketMQConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

4.5、接口

    @GetMapping("/send")
    public String sendMessage(@RequestParam String topic, @RequestParam String message) {
        rocketMQProducer.sendMessage(topic, message);
        return "Message sent: " + message;
    }

4.6、接口测试

至此测试完成!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2267776.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【疑难杂症】 HarmonyOS NEXT中Axios库的响应拦截器无法拦截424状态码怎么办?

今天在开发一个HarmonyOS NEXT的应用的时候&#xff0c;发现http接口如果返回的状态码是424时&#xff0c;我在axios中定义的拦截器失效了。直接走到了业务调用的catch中。 问题表现&#xff1a; 我的拦截器代码如下&#xff1a; 解决办法&#xff1a; 先说解决办法&#xff…

Unity功能模块一对话系统(4)实现个性文本标签

本期我们将了解如何在TMPro中自定义我们的标签样式&#xff0c;并实现两种有趣的效果。 一.需求描述 1.定义<float>格式的标签&#xff0c;实现标签处延迟打印功能 2.定义<r" "></r>格式的标签&#xff0c;实现标签区间内文本片段的注释显示功能…

Llama 3 预训练(二)

目录 3. 预训练 3.1 预训练数据 3.1.1 网络数据筛选 PII 和安全过滤 文本提取与清理 去重&#xff08;De-duplication&#xff09; 启发式过滤&#xff08;Heuristic Filtering&#xff09; 基于模型的质量过滤 代码和数学推理数据处理 多语言数据处理 3.1.2 确定数…

电路元件与电路基本定理

电流、电压和电功率 电流 1 定义&#xff1a; 带电质点的有序运动形成电流 。 单位时间内通过导体横截面的电量定义为电流强度&#xff0c; 简称电流&#xff0c;用符号 i 表示&#xff0c;其数学表达式为&#xff1a;&#xff08;i单位&#xff1a;安培&#xff08;A&#x…

低代码开源项目Joget的研究——基本概念和Joget7社区版应用

大纲 1. 基本概念1.1 Form1.1.1 Form1.1.1.1 概述1.1.1.2 主要特点和用途1.1.1.3 创建和使用 Form1.1.1.4 示例 1.1.2 Section1.1.2.1 概述1.1.2.2 主要特点和用途1.1.2.3 示例 1.1.3 Column1.1.4 Field1.1.5 示例 1.2 Datalist1.2.1 Datalist1.2.1.1 主要特点和用途1.2.1.2 创…

【二叉树遍历 Java版】二叉树的前中后序遍历and层次遍历

二叉树的前中后序遍历and层次遍历 深度优先遍历题目链接递归前序遍历中序遍历后序遍历 迭代前序遍历后序遍历中序遍历 统一迭代前序遍历后序遍历中序遍历 广度优先遍历102. 二叉树的层序遍历107. 二叉树的层序遍历 II637. 二叉树的层平均值199. 二叉树的右视图 深度优先遍历 深…

【Sentinel】初识Sentinel

目录 1.1.雪崩问题及解决方案 1.1.1.雪崩问题 1.1.2.超时处理 1.1.3.仓壁模式 1.1.4.断路器 1.1.5.限流 1.1.6.总结 1.2.服务保护技术对比 1.3.Sentinel介绍和安装 1.3.1.初识Sentinel 1.3.2.安装Sentinel 1.4.微服务整合Sentinel 1.1.雪崩问题及解决方案 1.1.1.…

Apriori关联规则算法 HNUST【数据分析技术】(2025)

1.理论知识 Apriori是一种常用的数据关联规则挖掘方法&#xff0c;它可以用来找出数据集中频繁出现的数据集合。该算法第一次实现在大数据集上的可行的关联规则提取&#xff0c;其核心思想是通过连接产生候选项及其支持度&#xff0c;然后通过剪枝生成频繁项集。 Apriori算法的…

如何让Tplink路由器自身的IP网段 与交换机和电脑的IP网段 保持一致?

问题分析&#xff1a; 正常情况下&#xff0c;我的需求是&#xff1a;电脑又能上网&#xff0c;又需要与路由器处于同一局域网下&#xff08;串流Pico4 VR眼镜&#xff09;&#xff0c;所以&#xff0c;我是这么连接 交换机、路由器、电脑 的&#xff1a; 此时&#xff0c;登录…

系统思考—冰山模型

“卓越不是因机遇而生&#xff0c;而是智慧的选择与用心的承诺。”—— 亚里士多德 卓越&#xff0c;从来不是一次性行为&#xff0c;而是一种习惯。正如我们在日常辅导中常提醒自己&#xff1a;行为的背后&#xff0c;隐藏着选择的逻辑&#xff0c;而选择的根源&#xff0c;源…

TP5 动态渲染多个Layui表格并批量打印所有表格

记录&#xff1a; TP5 动态渲染多个Layui表格每个表格设置有2行表头&#xff0c;并且第一行表头在页面完成后动态渲染显示内容每个表格下面显示统计信息可点击字段排序一次打印页面上的所有表格打印页面上多个table时,让每个table单独一页 后端代码示例&#xff1a; /*** Nod…

【笔记】linux虚拟机与windows的文件共享之Samba服务基本配置

做完之后的总结写在最前面便于复习&#xff1a; 虚拟机上要共享的资源通过samba的操作 允许window通过网络去访问其共享资源 防止以后看不懂放在最前面 &#xff08;一&#xff09;虚拟机操作部分 下载 samba smbclient samba-common 在根目录/新建一个samba专用文件夹&…

PyTorch Instance Normalization介绍

Instance Normalization(实例归一化) 是一种标准化技术,与 Batch Normalization 类似,但它对每个样本独立地对每个通道进行归一化,而不依赖于小批量数据的统计信息。这使得它非常适合小批量训练任务以及图像生成任务(如风格迁移)。 Instance Normalization 的原理 对每…

攻防世界web新手第五题supersqli

这是题目&#xff0c;题目看起来像是sql注入的题&#xff0c;先试一下最常规的&#xff0c;输入1&#xff0c;回显正常 输入1‘&#xff0c;显示错误 尝试加上注释符号#或者–或者%23&#xff08;注释掉后面语句&#xff0c;使1后面的单引号与前面的单引号成功匹配就不会报错…

机器视觉中的单线程、多线程与跨线程:原理与应用解析

在机器视觉应用中&#xff0c;程序的运行效率直接影响到系统的实时性和稳定性。随着任务复杂度的提高&#xff0c;单线程处理往往无法满足高性能需求&#xff0c;多线程技术因此被广泛应用。此外&#xff0c;跨线程操作&#xff08;如在多线程中更新界面或共享资源&#xff09;…

JAVA学习笔记第二阶段开始 Day11 五种机制---机制1:泛型机制

JAVA基础进阶版链接 https://pdai.tech/md/java/basic/java-basic-x-generic.html 五种机制 泛型机制 用处&#xff0c;提高类型安全性和代码重用 泛型在编写代码中使用【类型占位符】&#xff0c;而不是具体的类型&#xff0c;泛型是通过“类型擦除”来实现的类型安全性&…

ZLG嵌入式笔记 | 电源设计避坑(上)

产品上量后&#xff0c;通常都会有降成需求。多年来&#xff0c;接触过不少产品降成案例&#xff0c;在电源上下刀过猛&#xff0c;引发了产品偶发性问题&#xff0c;带来了很不好的负面影响。本文将对这些案例进行总结&#xff0c;提供电源设计参考&#xff0c;确保产品降成不…

全面了解 SQL Server:功能、优势与最佳实践

SQL Server 是微软公司推出的一款关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;广泛应用于企业级数据存储、数据分析、应用开发等领域。作为全球最受欢迎的数据库管理系统之一&#xff0c;SQL Server 提供了强大的功能和工具&#xff0c;支持从小型应用到大型…

WPF TextBox 输入限制 详解

总目录 前言 通常对于WPF输入框 输入的控制无非以下方式 1 直接禁止输入(包括粘贴) 不符合要求的字符 如只可输入数字的输入框&#xff0c;当你输入字母的时候是无法输入的 2 输入后&#xff0c;校验内容是否符合要求&#xff0c;然后提示错误&#xff0c;禁止提交信息 如只可…

从0入门自主空中机器人-4-【PX4与Gazebo入门】

前言: 从上一篇的文章 从0入门自主空中机器人-3-【环境与常用软件安装】 | MGodmonkeyの世界 中我们的机载电脑已经安装了系统和常用的软件&#xff0c;这一篇文章中我们入门一下无人机常用的开源飞控PX4&#xff0c;以及ROS中无人机的仿真 1. PX4的安装 1.1 PX4固件代码的下载…