一、MQ简介
1.1 什么是MQ
消息:是MQ中最小的概念,本质就是一段数据。
队列:在MQ中使用队列的数据结构来存储消息。
MQ是把消息和队列结合起来,称为消息队列(Message Queue),是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
1.2 MQ的应用场景
应用解耦
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
串行方式注册流程:
并行方式注册流程:
异步解耦方式:
消息发送到MQ后,会立即向客户端发出响应。响应的内容通常是关于消息是否成功接收、处理状态以及可能的错误信息。
流量削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提高系统的稳定性和用户体验。 举例:业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰。
1.3 各大MQ产品比较
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
producer-consumer | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
API完备性 | 高 | 高 | 高 | 高 |
多语言支持 | 支持,Java优先 | 语言无关 | 只支持Java | 支持,Java优先 |
单机吞吐量 | 万级 | 万级 | 万级 | 十万级 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
消息丢失 | 低 | 低 | 理论上不会丢失 | 理论上不会丢失 |
消息重复 | 可控制 | 理论上会有重复 | ||
文档完备性 | 高 | 高 | 高 | 高 |
提供快速入门 | 有 | 有 | 有 | 有 |
部署难度 | 低 | 低 | 中 | |
社区活跃度 | 高 | 高 | 中 | 高 |
商业支持 | 无 | 无 | 阿里云 | 无 |
成熟度 | 成熟 | 成熟 | 成熟 | 成熟(日志领域) |
特点 | 功能齐全,大量项目使用 | 借助于erlang语言并发能力,性能高 | 各环节分布式设计,主从HA,支持上万队列,多种消费模式,性能好 | |
支行协议 | openwire,stomp,rest,xmpp,amqp | amqp | 自定义的一套(社区提供JMS--不成熟) | |
持久化 | 内存,文件,数据库 | 内存,文件 | 磁盘文件 | |
事务 | 支持 | 支持 | 支持 | |
负载均衡 | 支持 | 支持 | 支持 | |
管理界面 | 一般 | 好 | 有web console实现 | |
部署方式 | 独立,嵌入 | 独立 | 独立 | |
评价 | 优点:成熟的产品,已经在很多公司应用但规模不大,各种协议支持较好,有多重语言的客户端;缺点:其重点已放到activemq6.0产品appollo上去了,目前社区不活跃,且对5.x的维护较少;不适用于上千队列场景 | 优点:由于erlang语言的特点,产品性能较好,在互联网公司有较大规模应用,支持amqp协议,有多语言且支持amqp的客户端可用;缺点:erlang语言难度大,集群不支持动态扩展 | 优点:模型简单,接口易用,在阿里大规模应用;性能好,可大量堆积消息在broker中,支持多种消费,包括集群消费,广播消费,开发活跃度高,版本更新快;缺点:没有在mq核心中实现JMS等接口,有些系统要迁移需要修改大量代码;支持的客户端语言不多,目前是java及c++,其中c++不成熟; | 优点:性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。在日志领域比较成熟,被多家公司和多个开源项目使用;缺点:Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长;消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序;社区更新较慢; |
1.4 发展历程(了解即可,纯属乱扯)
1.5 概念术语
-
生产者和消费者
生产者负责生产消息,一般由业务系统负责生产消息,消费者即后台系统,它负责消费消息。这里的信箱就相当于是消息中间件,小明就是消息的生产者,邮递员就是消息的消费者。
-
消息模型(Message Model)
消息模型主要有队列模型和发布订阅模型,RabbitMQ采用的是队列模型,如下图所示:
RocketMQ采用发布订阅模型,模型如图所示:
-
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。就好比是我们订阅的刊物,有科技类的刊物、财经类的刊物、军事类的刊物等等。这些刊物的类型实际上就是消息的主题。
-
代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。例如中国邮政负责消息的中转。
-
名字服务(Name Server)
相当于一个管理机构。本身不做消息的存储和转发工作,名称服务管理代理服务器broker
-
生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
-
消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。
-
拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
-
推动式消费(Push Consumer
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
-
普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
-
严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
二、RocketMQ安装与配置
1.前提是Linux中安装了JDK8及以上的版本。
#查看是否安装JDK
echo $JAVA_HOME
2.官网下载安装包-
RocketMQ最新版本:5.1
下载地址:https://rocketmq.apache.org/zh/release-notes/
3.上传到linux下并安装(使用xftp工具、rz命令、或者直接拖拽)
4.将文件解压并修改名称
#将文件解压
unzip rocketmq-all-5.1.2-bin-release.zip
#修改文件名称
mv rocketmq-all-5.1.2-bin-release rocketmq-5.1.2
#文件移动
mv rocketmq-5.1.2 /usr/local
5.修改bin/runserver.sh文件
6.修改bin/runbroker.sh文件
7.修改环境变量
#打开配置文件
vim /etc/profile
#添加如下环境变量
export JAVA_HOME=/usr/local/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
export ROCKETMQ_HOME=/usr/local/rocketmq-5.1.2
export PATH=$PATH:$ROCKETMQ_HOME/bin
8.让配置生效
source /etc/profile
9.进入rocketMQ启动NameServer服务和broker服务。
#启动NameServer
nohup sh mqnamesrv &
#启动broker
nohup sh mqbroker -n localhost:9876 &
如果遇到如下问题属于正常现象,或者说一定会遇到:
[root@localhost bin]# nohup: ignoring input and appending output to ‘nohup.out’
先通过命令查看服务是否启动:
jps
只要出现以下的信息表示启动成功:
三、RocketMQ管理命令
-
启动namesrv和broker
#启动NameServer
nohup sh mqnamesrv &
#启动broker
nohup sh mqbroker -n localhost:9876 &
-
查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log #查看日志
tail -f ~/logs/rocketmqlogs/broker.log #查看日志
-
新增topic
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t topicWarning
-
查看某个topic的状态
mqadmin topicStatus -n localhost:9876 -t topicWarning
-
查看所有消费组group
mqadmin consumerProgress -n localhost:9876
-
查看所有topic
mqadmin topicList -n localhost:9876
-
删除topic
mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t topicWarning
-
关闭namesrv和broker服务
mqshutdown namesrv
mqshutdown broker
四、RocketMQ整合springboot
4.1 创建生产者工程 springboot-rocketmq-producer
1.在pom.xml文件中添加依赖
<!--rocketmq与springboot整合包-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
2.配置文件
# nameserver的地址
rocketmq.name-server=192.168.66.100:9876
#指定生产组名称
rocketmq.producer.group=my-group
3.测试类
@SpringBootTest
class DemoApplicationTests {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Test
void testSendMessage(){
//向broker发消息
/*
* boot_topic:消息的主题
* hello mq:消息的内容
*/
rocketMQTemplate.convertAndSend("boot_topic","hello mq");
}
}
4.2创建消费者springboot-rocketmq-producer工程
1.在pom.xml文件中添加依赖
<!--rocketmq与springboot整合包-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependcy>
2.配置文件
# nameserver的地址
rocketmq.name-server=192.168.66.100:9876
3.创建消费者
@Component
/**
* consumerGroup:消费组名称随便写
*/
@RocketMQMessageListener(topic = "boot_topic",consumerGroup = "boot_consumer_group")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
五、RocketMQ架构
5.1 技术架构
RocketMQ架构上主要分为四部分,如上图所示:
-
Producer:消息发布的角色,支持分布式集群方式部署。生产者将消息发送给broker时,broker会返回消息的发送状态。
-
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式消费消息。
-
NameServer:管理Broker代理服务器。向broker发送心跳,监控broker的状态。如果有broker挂掉会实时更新broker列表。同时,也会存储维护broker中的主题列表。给生产者和消费者提供最新的路由信息。
-
BrokerServer:RocketMQ的核心,负责消息的接收和转发。NameServer和BrokerServer共同构成了消息中间件。
5.2 部署架构
RocketMQ 网络部署特点:
-
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
-
Broker部署相对复杂,Broker分为Master(主)与Slave(从),一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。(之前我们在Linux上安装的是一个Master的服务,在broker配置文件broker.conf中brokerID为0的是Master主机。在一组主从broker中brokerName都是相同的。)
-
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
-
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息。
-
结合部署架构图,集群工作流程可作如下描述:
- 启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic(4.4之后的版本会自动创建Topic),创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择一个队列,与队列所在的Broker建立长连接再向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
六、RocketMQ高级特性
6.1 消息存储
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题。
同步刷盘指的是生产者一生产消息就会将消息放在broker中等待消费者消费,这期间broker是不能进行其他工作的;异步刷盘指的是消息存储在内存中,而不是直接写入磁盘。当消息被发送时,消息会被存储到内存中,并异步刷盘到磁盘中,当小费者消费完磁盘(也就是broker中的消息后)同步刷盘的可靠性高于异步刷盘但是效率要低。
生产端发送同步消息
package com.zj.Producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
//实例化生产者,并指明生产者所属的生产组
DefaultMQProducer producer = new DefaultMQProducer("producerGroup01");
//设置nameserver地址
producer.setNamesrvAddr("192.168.66.100:9876");
//启动生产者
producer.start();
//创建消息体,并指定topic,tag(标识商品的类型)
for (int i = 0; i < 10; i++) {
Message message = new Message("boot_topic", "tagA", "hello MQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息
SendResult send = producer.send(message);
//打印消息返回结果
System.out.println(send);
}
//消息发送完毕关闭生产者
producer.shutdown();
}
}
启动MQ服务并发送消息:
D:\java\jdk8\bin\java.exe "-javaagent:D:\Java\IntelliJ IDEA 2019.3.4\lib\idea_rt.jar=49502:D:\Java\IntelliJ IDEA 2019.3.4\bin" -Dfile.encoding=UTF-8 -classpath D:\Java\jdk8\jre\lib\charsets.jar;D:\Java\jdk8\jre\lib\deploy.jar;D:\Java\jdk8\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk8\jre\lib\ext\cldrdata.jar;D:\Java\jdk8\jre\lib\ext\dnsns.jar;D:\Java\jdk8\jre\lib\ext\jaccess.jar;D:\Java\jdk8\jre\lib\ext\jfxrt.jar;D:\Java\jdk8\jre\lib\ext\localedata.jar;D:\Java\jdk8\jre\lib\ext\nashorn.jar;D:\Java\jdk8\jre\lib\ext\sunec.jar;D:\Java\jdk8\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk8\jre\lib\ext\sunmscapi.jar;D:\Java\jdk8\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk8\jre\lib\ext\zipfs.jar;D:\Java\jdk8\jre\lib\javaws.jar;D:\Java\jdk8\jre\lib\jce.jar;D:\Java\jdk8\jre\lib\jfr.jar;D:\Java\jdk8\jre\lib\jfxswt.jar;D:\Java\jdk8\jre\lib\jsse.jar;D:\Java\jdk8\jre\lib\management-agent.jar;D:\Java\jdk8\jre\lib\plugin.jar;D:\Java\jdk8\jre\lib\resources.jar;D:\Java\jdk8\jre\lib\rt.jar;D:\Java\code\springbootcode\sb_rocketMQ_producer\target\classes;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter\2.7.0\spring-boot-starter-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot\2.7.0\spring-boot-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-context\5.3.20\spring-context-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-beans\5.3.20\spring-beans-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-expression\5.3.20\spring-expression-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-autoconfigure\2.7.0\spring-boot-autoconfigure-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter-logging\2.7.0\spring-boot-starter-logging-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\ch\qos\logback\logback-classic\1.2.11\logback-classic-1.2.11.jar;D:\Java\apache-maven-3.8.3\repositories\ch\qos\logback\logback-core\1.2.11\logback-core-1.2.11.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\logging\log4j\log4j-to-slf4j\2.17.2\log4j-to-slf4j-2.17.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\logging\log4j\log4j-api\2.17.2\log4j-api-2.17.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\slf4j\jul-to-slf4j\1.7.36\jul-to-slf4j-1.7.36.jar;D:\Java\apache-maven-3.8.3\repositories\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-core\5.3.20\spring-core-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-jcl\5.3.20\spring-jcl-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\yaml\snakeyaml\1.30\snakeyaml-1.30.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-spring-boot-starter\2.0.3\rocketmq-spring-boot-starter-2.0.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-spring-boot\2.0.3\rocketmq-spring-boot-2.0.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-client\4.5.1\rocketmq-client-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-common\4.5.1\rocketmq-common-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\commons\commons-lang3\3.12.0\commons-lang3-3.12.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-acl\4.5.1\rocketmq-acl-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-remoting\4.5.1\rocketmq-remoting-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\com\alibaba\fastjson\1.2.51\fastjson-1.2.51.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-all\4.1.77.Final\netty-all-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-buffer\4.1.77.Final\netty-buffer-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec\4.1.77.Final\netty-codec-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-dns\4.1.77.Final\netty-codec-dns-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-haproxy\4.1.77.Final\netty-codec-haproxy-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-http\4.1.77.Final\netty-codec-http-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-http2\4.1.77.Final\netty-codec-http2-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-memcache\4.1.77.Final\netty-codec-memcache-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-mqtt\4.1.77.Final\netty-codec-mqtt-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-redis\4.1.77.Final\netty-codec-redis-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-smtp\4.1.77.Final\netty-codec-smtp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-socks\4.1.77.Final\netty-codec-socks-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-stomp\4.1.77.Final\netty-codec-stomp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-codec-xml\4.1.77.Final\netty-codec-xml-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-common\4.1.77.Final\netty-common-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-handler\4.1.77.Final\netty-handler-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-handler-proxy\4.1.77.Final\netty-handler-proxy-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver\4.1.77.Final\netty-resolver-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns\4.1.77.Final\netty-resolver-dns-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport\4.1.77.Final\netty-transport-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-rxtx\4.1.77.Final\netty-transport-rxtx-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-sctp\4.1.77.Final\netty-transport-sctp-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-udt\4.1.77.Final\netty-transport-udt-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-classes-epoll\4.1.77.Final\netty-transport-classes-epoll-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-unix-common\4.1.77.Final\netty-transport-native-unix-common-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-classes-kqueue\4.1.77.Final\netty-transport-classes-kqueue-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-classes-macos\4.1.77.Final\netty-resolver-dns-classes-macos-4.1.77.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-epoll\4.1.77.Final\netty-transport-native-epoll-4.1.77.Final-linux-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-epoll\4.1.77.Final\netty-transport-native-epoll-4.1.77.Final-linux-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-kqueue\4.1.77.Final\netty-transport-native-kqueue-4.1.77.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-transport-native-kqueue\4.1.77.Final\netty-transport-native-kqueue-4.1.77.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-native-macos\4.1.77.Final\netty-resolver-dns-native-macos-4.1.77.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-resolver-dns-native-macos\4.1.77.Final\netty-resolver-dns-native-macos-4.1.77.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-classes\2.0.52.Final\netty-tcnative-classes-2.0.52.Final.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-linux-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-linux-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-osx-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-osx-aarch_64.jar;D:\Java\apache-maven-3.8.3\repositories\io\netty\netty-tcnative-boringssl-static\2.0.52.Final\netty-tcnative-boringssl-static-2.0.52.Final-windows-x86_64.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-logging\4.5.1\rocketmq-logging-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\rocketmq\rocketmq-srvutil\4.5.1\rocketmq-srvutil-4.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;D:\Java\apache-maven-3.8.3\repositories\com\google\guava\guava\19.0\guava-19.0.jar;D:\Java\apache-maven-3.8.3\repositories\commons-codec\commons-codec\1.15\commons-codec-1.15.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-messaging\5.3.20\spring-messaging-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\spring-aop\5.3.20\spring-aop-5.3.20.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-databind\2.13.3\jackson-databind-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-annotations\2.13.3\jackson-annotations-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\jackson\core\jackson-core\2.13.3\jackson-core-2.13.3.jar;D:\Java\apache-maven-3.8.3\repositories\org\springframework\boot\spring-boot-starter-validation\2.7.0\spring-boot-starter-validation-2.7.0.jar;D:\Java\apache-maven-3.8.3\repositories\org\apache\tomcat\embed\tomcat-embed-el\9.0.63\tomcat-embed-el-9.0.63.jar;D:\Java\apache-maven-3.8.3\repositories\org\hibernate\validator\hibernate-validator\6.2.3.Final\hibernate-validator-6.2.3.Final.jar;D:\Java\apache-maven-3.8.3\repositories\jakarta\validation\jakarta.validation-api\2.0.2\jakarta.validation-api-2.0.2.jar;D:\Java\apache-maven-3.8.3\repositories\org\jboss\logging\jboss-logging\3.4.3.Final\jboss-logging-3.4.3.Final.jar;D:\Java\apache-maven-3.8.3\repositories\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;D:\Java\apache-maven-3.8.3\repositories\org\projectlombok\lombok\1.18.24\lombok-1.18.24.jar com.zj.Producer.SyncProducer
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712770000, offsetMsgId=C0A8426400002A9F0000000000000317, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960001, offsetMsgId=C0A8426400002A9F00000000000003FF, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960002, offsetMsgId=C0A8426400002A9F00000000000004E7, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960003, offsetMsgId=C0A8426400002A9F00000000000005CF, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960004, offsetMsgId=C0A8426400002A9F00000000000006B7, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960005, offsetMsgId=C0A8426400002A9F000000000000079F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960006, offsetMsgId=C0A8426400002A9F0000000000000887, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712960007, offsetMsgId=C0A8426400002A9F000000000000096F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712A60008, offsetMsgId=C0A8426400002A9F0000000000000A57, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A800642BF818B4AAC27FE712A60009, offsetMsgId=C0A8426400002A9F0000000000000B3F, messageQueue=MessageQueue [topic=boot_topic, brokerName=localhost.localdomain, queueId=0], queueOffset=3]
20:04:10.038 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.66.100:10911] result: true
20:04:10.038 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.66.100:9876] result: true
Process finished with exit code 0
生产端发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。主要应用于对性能要求较高但是对可靠性不高的情况下。
package com.zj.Producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
//创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producerGroup01");
//设置nameserver地址
producer.setNamesrvAddr("192.168.66.100:9876");
//启动生产者
producer.start();
//设置发送消息失败时重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
//根据消息数量实例化倒计时器
CountDownLatch2 countDownLatch2 = new CountDownLatch2(10);
for (int i = 0; i < 10; i++) {
final int index = i;
Message message = new Message("boot_topic", "tagA","orderId111", "hello MQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch2.countDown();
System.out.println(sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
countDownLatch2.countDown();
throwable.printStackTrace();
}
});
}
//关闭producer
countDownLatch2.await(5, TimeUnit.SECONDS);
}
}
6.2 负载均衡
RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。
-
生产者的负载均衡:
如图所示,5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式 发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。
-
消费者的负载均衡:
如图所示,如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二consumer 消费 2 个队列。 这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数 量,如果 Consumer 超过队列数量,那么多余的Consumer 将不能消费消息 。
6.3 事务消息
Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示:
事务消息发送步骤如下:
-
生产者将半事务消息发送至消息队列RocketMQ服务端(也就是broker),半消息不能被消费,只有将半消息标记为正常的消息才能被消费。
-
消息队列RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
-
生产者开始执行本地事务逻辑。
-
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端不会将该消息投递给消费者,并按照如下逻辑进行回查处理:
- 在断网或者是生产者应用重启的特殊情况下,上图步骤4提交的二次确认最终未到达服务端,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照上图4步骤对半事务消息进行处理。
6.4 顺序消息
消息有序指的是按照消息的发送顺序来消费(FIFO)。RocketMQ可以保证消息有序,消息有序分为部分有序和全局有序。全局有序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
如果要实现全局顺序消息,那么只能使用一个队列,一个生产者,这会严重影响性能。
因此,我们常说的顺序消息通常指部分顺序消息。
顺序消费的原理解析,在默认的情况下消息发送会采取轮询方式把消息发送到不同的分区队列;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
6.5 消息重试
生产端重试
例如由于网络原因导致生产者发送消息到MQ失败,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。
// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);
消费端重试
同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。
-
顺序消息重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生
-
无序消息重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,可以通过设置返回状态达到消息重试的结果。
最大重试次数
消息消费失败后,可被消息队列RocketMQ重复投递的最大次数。
TCP协议无序消息重试时间间隔:
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10秒 | 9 | 7分钟 |
2 | 30秒 | 10 | 8分钟 |
3 | 1分钟 | 11 | 9分钟 |
4 | 2分钟 | 12 | 10分钟 |
5 | 3分钟 | 13 | 20分钟 |
6 | 4分钟 | 14 | 30分钟 |
7 | 5分钟 | 15 | 1小时 |
8 | 6分钟 | 16 | 2小时 |
消费失败后重新配置方式
需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
- 返回 Null
- 抛出异常
6.6 延迟消息
Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
6.7 消息查询
在实际开发中,经常需要查看MQ中消息的内容来排查问题。 生产者将消息发送到broker之后,broker会自动为消息创建一个messageID,RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。
//返回结果
SendResult [
sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0],
queueOffset=0]
-
按MessageId查询消息
Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址、偏移信息,并且会把Message Id作为结果的一部分返回。Message Id中属于精确匹配,代表唯一一条消息,查询效率更高。
-
按照Message Key查询消息
消息的key是开发人员在发送消息之前自行指定的,通常把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。
-
按照Unique Key查询消息
除了开发人员指定的消息key,生产者在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一代表一条消息。
消息在消息队列RocketMQ中存储的时间默认为3天(不建议修改),即只能查询从消息发送时间算起3天内的消息,三种查询方式的特点和对比如下表所述:
查询方式 | 查询条件 | 查询类别 | 说明 |
---|---|---|---|
按Message ID | Message ID | 精确查询 | 根据Message ID可以精确定位任意一条消息, 获取消息的属性。 |
按Message Key | Topic+Message Key | 模糊查询 | 根据Topic和Message Key可以匹配到包含指定Key 的最近64条消息。注意 建议消息生产方为每条消息 设置尽可能唯一的Key,以确保相同的Key的消息不 会超过64条,否则消息会漏查。 |
按Unique Key | Unique Key | 精确查询 | 与Message Id类似,匹配唯一一条消息, 如果生产端重复发送消息,则有可能匹配多条,但它们的unique key是唯一的。 |