RocketMQ入门到精通
- 一、介绍
- 1.对比
- 2.基础概念
- 二、环境搭建
- 1.下载rocket
- 2.新增系统变量:ROCKETMQ_HOME
- 3.启动命名服务 nameserver
- 4.启动broker服务器
- 5.安装可视面板
- 6.手动创建Topic
- 7.手动创建消费者组
- 三、使用Springboot实现消息的收发
- 1.引入jar包
- 2.配置yml文件
- 3.编写一个生产者
- 4.编写一个消费者
- 四、RocketMQ一些基础操作
- 1.单个服务不可以出现多个同group的消费者
- 2.同步消息
- 3.异步消息
- 4.单向消息
- 5.延时消息(RocketMQ 4 版本)
- 6.延时消息(RocketMQ 5 版本)
- 7.批量消息
- 8.顺序消息-分区有序性
- 9.顺序消息-全局有序性
- 10.事务消息
- 11.发送tag消息
- 12.死信消息
- 五、RocketMQMessageListener 注解详解
- 六、RocketMQ的集群
- 1.集群搭建概述
- 1.MQ安装与配置环境变量
- 2.服务端配置文件详解与配置
- 3.启动集群
- 4.验证集群
- 5.RocketMQ集群下Springboot的配置
- 6.验证集群的使用
- 七、消息中间件的通用问题
- 1.如何保证消息的有序性
- 2.如何保证消息不丢失-生产者不丢、Broker不丢、消费者不丢
- 3.RocketMQ如何和文件系统进行高效交互存取消息
- 3.消息存储commitlog、consumerqueue、index
- 4.刷盘机制
- 5.消息积压问题
- 6.消息消费的幂等问题
一、介绍
RocketMQ 官方文档:https://rocketmq.apache.org/zh/docs/4.x/producer/04message3
RocketMQ 下载地址:https://rocketmq.apache.org/zh/download
1.对比
企业使用如果不是大数据场景,就用Rocket或者Rabbit,如果大数据场景肯定是Kafka了,Active现在已经没人用了,没啥优势。笔者之前写过一篇Rabbit的详细总结,有兴趣的可以看下:RabbitMQ入门到精通
2.基础概念
-
生产者:
生产消息,生产者启动以后会先将自己的信息注册到命名服务器中,生产者从命名服务器中获取broker的信息,然后与其进行绑定。 -
消费者:
消费消息,同理消费者也需要将自己注册到命名服务器中,他也是从命名服务器获取broker的地址,然后对其进行监听 -
broker:
消息存储的地方,他也需要向命名服务器注册自己,因为他被生产者和消费者需要 -
命名服务器:
用来管理生产者、消费者、broker的集群。三者往命名服务器注册时都有心跳机制,默认30s发送一次心跳检测。
-
Message:
生产者往Rocket中发送消息时,消息就是Message对象(Rocket是java写的)。
在Rocket中有group、topic、tag 等涉及到消息分发的概念。这里先对三个概念做个简单的介绍。 -
group 组:
用来管理topic集群,或者叫管理队列的集群的(每个topic默认有4个queue),可以将他看成队列集群的逻辑边界,假如有一个消费者服务器群组group1,里面有三个消费者,分别监听了Topic1、Topic2、Topic2,还有一个消费者服务器群组group2里面也有三个消费者,他们分别监听了Topic1、Topic2、Topic3三个主题的队列,那么他们的模型应该是类似下面这样的。
-
topic 主题:
topic主要是用于对消息的过滤使用,可以将topic和队列看成对应关系,可以理解为topic都会有他默认的队列,发送了指定的topic就会到达该topic默认的队列上了。这个概念类似于Rabbit的直连模式。加入生产者发送了一个topic为Topic1的消息(看上面的图),此时所有监听Topic1的group都会收到这个消息,group收到消息的模式是广播模式(注意Rocket可以调整消息模式,调整的不是这个),此时所有的group收到的消息都是一样的。在每个group内部如果有多个监听Topic1主题的消费者。那么每个监听Topic1主题的消费者默认是以负载均衡的方式收到消息。假如我们发送了10条topic为Topic1的消息。那么上图的模型中,group1中的每个Topic1的消费者将接收5条消息,group2中只有1个Topic1的消费者,他将接收10条消息。
总结一句话:消息到达group是广播模式这个不可更改,每个group内部是负载均衡模式将消息负载分发到不同的消费者,这个模式可以通过配置更改为广播模式。
-
tag 标签:
标签的作用是为了对消息的去向做进一步的限制,如果熟悉Rabbit可以将其看成是routing-key(但是和routing-key作用的节点是不一样的),那tag是如何约束消息的走向呢,上面已经说了消息会根据topic来找到topic对应的队列。我们如果在消息发送时指定了tag(存放于消息头中)。此时消息进入topic指定的队列时是不区分tag的,当有消费者监听队列时如果消费者指定了tag,那么只有与消费者指定的tag(消费者也可以指定多个tag)相同的消息才会被消费者获取到,所以Rocket通过tag由可以对同一个group内的消息做不同的分发,不过这个是由消费者来控制的。
-
queue
在Rocket中queue的概念不强,为什么呢?因为涉及到queue的动作都是隐式的,他被topic封装起来了,使用时我们都是以topic纬度进行使用的。其实默认情况下每个topic会有四个queue用以保存这个topic下的消息,当然这个也是可以更改的,只需要在部署broker的时候进行指定就行了。
这里对比Rabbit坐下总结下。
Rabbit消息分发时消息是在生产者和MQ服务交互时把消息的最终定向确定下来的,消费者只负责通过队列接收消息。
Rocket消息分发时消息是在MQ服务器与消费者进行确定消息去向的,只有符合消费者的topic+tag消息才会被推送给消费者,生产者发消息时只负责发就行了
可以看出Rabbit和Rocket的消息消费模型还是有很大不同的。
二、环境搭建
这部分介绍下环境的搭建过程,windows和linux区别不大。这里是windows为例。
1.下载rocket
下面是官方下载地址
下载地址:https://rocketmq.apache.org/zh/download
下载以后解压,解压位置随意,最好别有中文路径。
这里单机的搭建使用4.8的windows进行搭建,后面集群使用的是5.2.0 在linux环境下搭建。
2.新增系统变量:ROCKETMQ_HOME
新增环境变量,这里新增的是系统变量,他的值对应的是上面下载文件的解压路径,如下:
3.启动命名服务 nameserver
这里需要先配置jdk环境。也是配置一个JAVA_HOME和增加path就行。假设已经配好了
Rocket5.2.0 的搭建和这个没有任何区别,这里展示4.8.0 的搭建了,后面代码有兼顾Rocket5.2.0的新Api一起说
先进入到bin目录下,我的是D:\develop\rocketmq-all-4.8.0-bin-release\bin,然后在这里进行操作,执行以下命令
start mqnamesrv.cmd
弹出以下页面表示成功,一般只要系统变量配了,就不会起不成功。
4.启动broker服务器
先进入到bin目录下,我的是D:\develop\rocketmq-all-4.8.0-bin-release\bin,然后在这里进行操作,执行以下命令
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
这里指定的127.0.0.1:9876 是nameserver的地址,也就是nameserver的默认端口是9876.
弹出以下页面表示成功。
到这里nameserver与broker就都启动成功了。
5.安装可视面板
本文展示的可是面板只是其中 之一,官网还有一个是这个,操作没区别:https://github.com/apache/rocketmq-dashboard
这个可视面板也是apache的工程,下面是下载地址:
https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0
然后下载即可:
下载完成后(D:\develop\rocketmq-externals-release-rocketmq-console-1.0.0),本地idea打开,跳过测试maven进行编译下
编译没有问题后确认下命名空间地址对不对:
然后直接启动就行了,如果没有意外是不会有意外的
然后就可以页面访问了:http://localhost:8080/#/
这样可视面板也ok了。
6.手动创建Topic
一般我们可以开启Broker自动创建topic,但是生产一般不建议直接开启,而是自己创建,自己创建可以写java代码创建,也可以通过console来进行创建,这里展示如何使用console进行创建,进入如下位置,点击红圈进行创建
然后回弹出以下页面:
clusterName:选择你的集群就行
BROKER_NAME:每个Broker都有自己的名字,这里一般需要选择所有的Broker,这样就可以保证消息在集群的负载了
topicName:你的topic名
writeQueueNum: 用于写入消息的队列数,生产者发送消息,Broker用多少queue来进行写消息
readQueueNums:用于读消息的队列数,消费者消费消息时,可以用于读取消息的队列数,一般writeQueueNum=writeQueueNum即可
perm:一般选择6就行,6就是读写都有权限
全部填完以后commit即可。
7.手动创建消费者组
不光topic生产不推荐自动创建,消费者组也不推进自动创建,消费者的组创建也可以通过java代码或者控制台,这里展示使用控制台创建:
然后会弹出如下页面,我已经填了一部分参数,现在对各个参数进行解释:
clusterName:集群名,选择你的集群就行
brokerName:除非有特殊要求,选择所有broker就行
groupName:你的组名
consumeEnable: 开启消费
consumeBroadcastEnable:是否开启广播模式,默认是集群模式,不用开启
retryQueueNums: 用以重试的队列数
brokerId:0表示用主节点,1表示用从节点(如果有多个从节点,还有可能有2,3等)
whichBrokerWhenConsumeSlowly:当消费慢时使用哪个Broker消费,0是主,1是从
三、使用Springboot实现消息的收发
1.引入jar包
引入rocket的启动器,这个starter里用的客户端是4.6.0的客户端。其他需要依赖的web包等这里不展示了,需要自引。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
2.配置yml文件
引入了jar余下就是需要配置下yml了,配置如下:
server:
port: 8880
rocketmq:
# 命名服务地址
name-server: localhost:9876
producer:
# 生产者组
group: test-group
3.编写一个生产者
生产者代码也比较简单,使用springboot提供的模版即可轻松搞定,如下:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class ActionClass {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String doSend(){
rocketMQTemplate.convertAndSend("topicTest","hello rocketmq");
return "send";
}
}
启动这个简单的springboot工程然后调用下这个接口,如下:
然后我们去上面搭建的Rocket的可是面板看下消息:
点击上图画圈的位置就可以看到消息的明细了,如下:
这样就可以确认生产者没有问题了。
4.编写一个消费者
代码比较简单,如下:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@Component
@RocketMQMessageListener(topic = "topicTest", consumerGroup = "test-group")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到消息:" + s);
}
}
测试结果如下:
可以看到正常收到消息了,到此一个简单的收发demo就ok了。
四、RocketMQ一些基础操作
这节主要总结一些Rocket的一些基础操作。
1.单个服务不可以出现多个同group的消费者
在一个服务内部不可以出现两个相同group的消费者,不然程序无法启动,如果有两个相同group的消费者,程序启动后报错如下:
2.同步消息
Rocket支持直接发送同步消息,这样可以实时获取到发送的结果,这种适用于对于消息实时准确性要求比较高的场景,下面是同步消息的使用。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class ProductorSync {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendSync")
public String doSend(){
for (int i = 0; i < 5; i++) {
SendResult sendResult = rocketMQTemplate.syncSend("test-topic", "hello rocketmq" + i);
System.out.println("第"+i+"条消息返回结果:"+sendResult);
}
return "send";
}
}
3.异步消息
异步消息可以指定消息的回调,用以异步接受消息的返回结果,这点和Rabbit有些类似,不过Rabbit需要主动开启回调,而Rocket默认就支持回调。
此外Rabbit的回调方法是公用的,Rocket的回调方法是每个场景特有的。这里Rocket会更灵活。此外对于发送异常的回调,Rocket只返回一个异常信息,没有Rabbit异常场景下返回的更细致。总得来看Rocket更好用一些。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class ProductorAsync {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendAsync")
public String doSend(){
for (int i = 0; i < 5; i++) {
int finalI = i;
rocketMQTemplate.asyncSend("test-topic", "第" + i + "条消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("第"+ finalI +"条消息发送成功"+sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
}
return "send";
}
}
4.单向消息
何为单向消息,单向消息指消息发送时单向且不关心回调的。这种场景常用于日志收集等对于消息一致性不高的场景,此时只关心效率,对于缺失个别消息可以忍受就可以使用单向消息进行发送,无疑这种消息具有更好的并发性。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class ProductorOneWay {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendOneWay")
public String doSend(){
for (int i = 0; i < 5; i++) {
rocketMQTemplate.sendOneWay("test-topic","hello rocketmq"+i);
}
return "send";
}
}
5.延时消息(RocketMQ 4 版本)
延时消息应用场景还是很多的,Rocket的延时默认就支持,只不过他的延时是梯度支持的,并不支持任意时间,而且Rabbit的延时是基于延时消息实现的(Rabbit可以通过增加延时插件实现延时消息和延时队列两种)。当然这种情况在Rocket5.0以后得版本已经解决了(如果使用Rocket5.0以后得版本则支持任意时间的延时)。笔者这里用的是4.8的服务端,所以延时消息还是梯度的。下面是延时的梯度列表:
RocketMQ 一共支持18个等级的延迟投递,具体时间如下
那么问题来了,如果想要实现一个15分钟的延时应该怎么做呢?两种方式
- 1.先发一个10分钟的延时消息,再发一个5分钟的延时消息,这样就可以解决了
- 2.直接使用Rocket5.0 以后得版本,5.0以后已经支持任意时间的延时了,底层采用时间轮的方式实现
下面是发送一个1分钟延时消息的示例代码:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class ProductorDelay {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendDelay")
public String doSend(){
for (int i = 0; i < 5; i++) {
// 注意传的是delayLevel,5 代表的是一分钟
rocketMQTemplate.syncSend("test-topic1", MessageBuilder.withPayload("delay-message"+i).build(),3000,5);
System.out.println("send"+i+",时间:"+ LocalDateTime.now());
}
return "send";
}
}
下面展示下结果,可以看到是整整一分钟
6.延时消息(RocketMQ 5 版本)
这里说下RocketMQ5 版本的延时消息,到5以后,RocketMQ已经支持了任意时间的延时了。同时仍然也支持梯度的延时策略。在Springboot中最新的延时方法是一些新的api并未更改原先的4中的api,下面是测试代码:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class ProductorDelay {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendDelay")
public String doSend(){
for (int i = 0; i < 1; i++) {
// RocketMQ 5 的延时写法
rocketMQTemplate.syncSendDelayTimeMills("test-topic1", MessageBuilder.withPayload("delay-message-new"+i).build(),5000);
// RocketMQ 5 的延时写法
rocketMQTemplate.syncSendDelayTimeSeconds("test-topic1", MessageBuilder.withPayload("delay-message-new"+i).build(),5);
System.out.println("send"+i+",时间:"+ LocalDateTime.now());
}
return "send";
}
}
上面是一个延时5秒的例子,第一个方法单位是毫秒,第二个则是秒,效果都是一样的。下面是结果:
可以很清晰的看到,RokcetMQ5的延时消息是有时间误差的,并没有这么精准,测试了几个时间段发现,值越小时时间误差也大(这种问题的功能为什么要上线啊),使用时如果对精度要求比较高,可以选用Rabbit或者使用Rocket4 的写法,都比5的写法会好一些
7.批量消息
消息如果一条一条发如果并发较大,肯定会占用较多的线程与服务器的TCP连接,所以如果适合批量发送的场景我们还可以使用批量发送来优化发送代码,下面是一个批量发送的例子:
注意:批量发送,接收也是批量的
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class ProductorBatch {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendBatch")
public String doSend(){
List<String> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messages.add("消息:"+i);
}
// 批量发送,只有同步发送支持
SendResult sendResult = rocketMQTemplate.syncSend("test-topic1", messages);
System.out.println("批量发送结果:"+sendResult);
return "send";
}
}
下面是测试截图:
8.顺序消息-分区有序性
在Rocket中消息的顺序性需要分全局有序和分区有序来说,因为Rocket是一个topic对应多个队列的(默认一个topic为4个队列)。当生成者将消息随机投递进入topic的多个队列后,消费者获取消息自然也就无序了,这也是Rocket的默认模式,但如果想要有序该如何做呢。这里使用Springboot的Api进行演示,分区有序性的写法如下:
import cn.felord.generatecode.domain.UserMQ;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
/**
* @Author: pcc
* @Description: 顺序消费
*/
@RestController
@RequestMapping("/action")
public class ProductorOrderBy {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendOrderBy")
public String doSend(){
UserMQ userMQ1 = new UserMQ("order-1","2");
UserMQ userMQ2 = new UserMQ("order-2","2");
UserMQ userMQ3 = new UserMQ("order-3","2");
UserMQ userMQ4 = new UserMQ("order-4","3");
UserMQ userMQ5 = new UserMQ("order-5","3");
UserMQ userMQ6 = new UserMQ("order-6","3");
UserMQ userMQ7 = new UserMQ("order-7","4");
UserMQ userMQ8 = new UserMQ("order-8","4");
List<UserMQ> list = Arrays.asList(userMQ1,userMQ2,userMQ3,userMQ4,userMQ5,userMQ6,userMQ7,userMQ8);
list.forEach(item->{
rocketMQTemplate.syncSendOrderly(
"order-topic",
item.toString(),
item.getAge());
});
return "send";
}
}
上面是生产者代码,消费端也必须同时指定消费模式为顺序消费才可以(ConsumeMode.ORDERLY)。
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "test-group2",consumeMode = ConsumeMode.ORDERLY)
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("Consumer2收到消息:" + s+",时间:"+ LocalDateTime.now());
}
注意:
- 0.消费贷必须指定消费模式为顺序消费
- 1.这里使用的方法是syncSendOrderly(异步也有类似的方法asyncSendOrderly)
- 2.syncSendOrderly该方法的第三个参数为hashKey。
- 3.hashKey的作用是用来做分区划分的,相同hashKey的消息会被分配到同一个分区
- 4.字符串相等hashkey一定相当,所以上面的程序userMQ1 、userMQ2、userMQ3 一组,userMQ4、userMQ5、userMQ6 一组,userMQ7、userMQ8 一组。
- 5.此时保证的是分区有序性,所以userMQ1 、userMQ2、userMQ3的顺序应该可以保证,输出如下:
userMQ4、userMQ5、userMQ6 的顺序应该也是可以保证的,如下:
可以看到这个方法确实实现了分区有序性。特别需要关注的是hashKey的选择,一定要是一个能具有划分分区的值才可以。
9.顺序消息-全局有序性
如果明白了分区有序性,全局有序性只需要根据分区有序性进行变化一下而已:将所有的hashKey都设置为形同的常量,那么同一个topic的消息自然就会进入到同一个分区(队列)里了,如此便可以实现全局有序性,当然消费端的消费模式仍然必须是顺序消费才可以。
下面把上面例子的age属性都改成相同的值:
import cn.felord.generatecode.domain.UserMQ;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
/**
* @Author: pcc
* @Description: 顺序消费
*/
@RestController
@RequestMapping("/action")
public class ProductorOrderBy {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendOrderBy")
public String doSend(){
UserMQ userMQ1 = new UserMQ("order-1","2");
UserMQ userMQ2 = new UserMQ("order-2","2");
UserMQ userMQ3 = new UserMQ("order-3","2");
UserMQ userMQ4 = new UserMQ("order-4","2");
UserMQ userMQ5 = new UserMQ("order-5","2");
UserMQ userMQ6 = new UserMQ("order-6","2");
UserMQ userMQ7 = new UserMQ("order-7","2");
UserMQ userMQ8 = new UserMQ("order-8","2");
List<UserMQ> list = Arrays.asList(userMQ1,userMQ2,userMQ3,userMQ4,userMQ5,userMQ6,userMQ7,userMQ8);
list.forEach(item->{
rocketMQTemplate.syncSendOrderly(
"order-topic",
item.toString(),
item.getAge());
});
return "send";
}
}
这里消费端代码和上一个例子(分区有序性的消费者代码),相同就不重复展示了。
理论上面的代码将实现完全的顺序消费,也就是从1到8的顺序展示,如下图确实如此:
这样就实现了全局有序性。Rocket这里和Rabbit区别还是很大的,Rabbit是只有一个队列存储消息,所以默认就是存储有序的,不过消费有序则需要消费者自己控制了。
10.事务消息
分布式事务场景使用Rocket作为解决方案也是一个不错的选择,但是这种方案是一种最终一致性方案,做不到实时的一致,对于实时性要求不高的业务场景可以选择使用这种来解决分布式事务。下面是RocketMQ的事务消息的模型(如果对分布式事务的各种解决方案感兴趣,可以看看笔者的这篇文章:一文速通分布式事务各种解决方案):
图画的挺复杂,这里做下文字解释:
- 1.生产者先发一条半消息给MQ
- 2.MQ将半消息进行持久化,成功了给生成者回执告诉生产者MQ已经持久化了,你可以告诉我下一步是将半消息转为正式消息还是丢弃了
- 3.生成者根据本地事务状态告诉MQ半消息应该怎么处理,本地事务成功返回一个指定的状态MQ将消息转为正常消息,本地失败则发送回滚,MQ丢弃消息。本地事务状态未知告诉MQ状态未知,MQ调用生产者的补偿逻辑
- 4.生产者补偿逻辑是开发自定义的,支持的状态和上面没有区别,我们一般可以在补偿逻辑里再查一遍数据库,有的化告诉成功,没有就是失败,若是未知的场景,可以将该场景记录,人工介入处理。
使用RocketMQ的事务消息,在Springboot中需要两步:
- 第一步:注册事务监听器
事务机制用于处理半消息成功的回调,和本地回调处理异常时的处理,有如下代码import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; /** * @Author: pcc * @Description: 注册事务监听 */ @Slf4j @RocketMQTransactionListener public class MyTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { log.info("MQ事务消息执行方法开始执行: msg:{},arg:{}", msg, arg); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("本地事务状态未知时MQ回调方法开始执行: msg:{}", msg); return RocketMQLocalTransactionState.COMMIT; } }
- 第二步:发送事务消息
下面是发送事务消息的代码
说明:import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @Author: pcc * @Description: 一句话描述该类 */ @RestController @RequestMapping("/action") public class ProductorTransaction { @Resource private RocketMQTemplate rocketMQTemplate; @GetMapping("/sendTran") public String doSend(){ rocketMQTemplate.sendMessageInTransaction("test-topic",MessageBuilder.withPayload("我是事务消息内容").build(),"111111"); return "send"; } }
这个例子里有一些需要说明的问题。- ① 必须先注册事务监听器,不同的事务在事务监听器的方法executeLocalTransaction中需要靠message和arg进行区分
- ② 事务监听器中的message就是发送事务消息的message,arg就是发送事务消息的arg参数(上面传的111111)。
- ③ 所以可以还是用message和arg来区分不同的事务,来进行不同的操作。
- ④ executeLocalTransaction返回提交的话半消息变成正常消息此时可供消费者消费了,如果executeLocalTransaction回滚的话则半消息丢弃,executeLocalTransaction返回未知的话,则MQ会在20秒左右回调checkLocalTransaction方法。
- ⑤ checkLocalTransaction方法内还可以查询数据库看看是本地事务到底有没有成功,这里返回提交或者回滚和上面逻辑一样,如果这里仍然返回未知应该人为介入了,不然MQ会一直循环调用这个check方法,这样纯属浪费资源了(这个状态一段时间后确实可以变化也可以让他轮巡,控制好次数就行)。
下面展示一个极端的场景例子,就是两个方法都返回未知时的例子的截图,如下:
11.发送tag消息
Rocket发送消息时可以携带tag或者sql条件,他们可以用于消费者消息过滤的依据,这里展示下tag的写法。注意所有send的api都支持这种tag写法,所以这里随便选了两个send方法进行展示。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class ProductorTag {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendTag")
public String doSend(){
rocketMQTemplate.convertAndSend("test-topic:tag1", "我是tag消息,我的tag是tag1");
rocketMQTemplate.send("test-topic:tag2",MessageBuilder.withPayload("我是tag消息,我的tag是tag2").build());
return "send";
}
}
下面是消费者代码,消费者可以指定多个tag,多个只需要使用 || 隔开就行
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@Component
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "test-group1",
selectorType = SelectorType.TAG,// 这是默认值
selectorExpression = "tag1 || tag2") // 声明多个tag使用 || 隔开
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("Consumer1收到消息:" + s+",时间:"+ LocalDateTime.now());
}
}
下面是运行展示:
12.死信消息
- 死信队列特征:
- 每个组都有自己的死信队列,死信队列归属group,属于group公用,不归属任何topic,也不归属消费者
- 一个死信队列,可以包含同group下的所有topic的死信消息
- 死信队列不会默认创建和初始化,当第一个死信出现后,死信队列才会被创建
- 死信队列的消息不会被重复消费,一旦被取走,将彻底消息
- 死信队列的消息如果不被消费最多存在三天
- 当无序消息被消费失败16次后会进入死信队列
私信队列和普通队列的消费没有区别,可以设置一个消费者进行监听,根据死信消息内容进行分别处理。
五、RocketMQMessageListener 注解详解
RocketMQMessageListener 是RocketMQ SpringBoot Starter提供的一个注解,用于定义RocketMQ消费者的监听器,同时Rocket的消费者还需要实现RocketMQListener接口,同时指定消息的泛型类型,重写onMessage方法。这里唯一需要说的是RocketMQMessageListener 该注解的一些属性。
-
consumerGroup
指定消费者所属的消费组名称。消费者组是RocketMQ中用于负载均衡和消息消费的机制。同一个消费组中的消费者实例将共同消费消息,不会重复消费。group之间都是互补影响的独立单位。 -
topic
指定消费者要订阅的主题名称。消费者将从这个主题中接收消息,在broker中,默认每个topic有4个queue -
selectorType
定义消息选择器的类型。RocketMQ支持两种选择器类型:TAG(基于tag过滤消息)和SQL92(基于SQL92表达式过滤消息),tag是默认方式,如果想要使用sql过滤,还需要更改selectorType -
selectorExpression
与selectorType配合使用,定义具体的过滤表达式。如果是TAG类型,则可以指定tag;如果是SQL92类型,则可以指定SQL92表达式。假如使用的是sql模式,这个表达式可以这么写 “age > 10” (假设有一个属性是int类型的age) -
consumeMode
定义消费者的消费模式。支持两种模式:CONCURRENTLY(并发消费)和ORDERLY(顺序消费)。在顺序消费模式下,消息会按照顺序被同一个消费者实例消费。CONCURRENTLY 则会开启多线程消费消息,以提升消费速度,但是使用并发消费无法保证消息消费的顺序性,如果对顺序性有要求则需要注意了。 -
consumeThreadMax
与consumeMode配合使用,当并发消费时可用于指定消费线程数。指定消费者线程池的最大线程数。这决定了可以同时处理消息的最大并发数。 -
consumeTimeout
指定消息处理的最长等待时间。如果消息处理时间超过这个值,RocketMQ会认为消息处理失败,并可能重新投递该消息。 -
messageModel
定义消息模型。支持两种模型:CLUSTERING(集群模式,默认)和BROADCASTING(广播模式)。在集群模式下,消息只会被消费组中的一个消费者实例消费;在广播模式下,每个消费者实例都会收到消息的副本。注意这里针对的都是每个组内的消费者,消费者组接收消息都是广播的。 -
accessKey
RocketMQ服务的访问密钥,用于身份验证。 -
secretKey
RocketMQ服务的密钥,用于身份验证。 -
enableMsgTrace
是否启用消息轨迹。如果设置为true,RocketMQ会记录消息的轨迹信息。那记录了轨迹如何查看的,别急后面还有一个属性叫customizedTraceTopic -
customizedTraceTopic
自定义消息轨迹主题。如果启用消息轨迹,可以指定一个自定义的主题来存储轨迹信息。我们可以指定主题来存储和消费轨迹消息,从而拿到消息的轨迹。 -
nameServer
RocketMQ名称服务器的地址。消费者使用名称服务器来查找broker,这个一般不需要指定,直接使用默认的就行,如果想要实现一个服务内监听多个集群的Rocket,可以使用这个实现。 -
accessChannel
指定访问RocketMQ的通道。默认为LOCAL,如果使用云服务,可以设置为相应的云服务通道,因为云上可能会提供专用的访问channel,使用专用的channel可能会提升速度、安全等,也可以不用。
下面给一个多线程消费的例子:
注意:如果你用的是5.2.0 版本,consumeThreadMax 的值应该大于20,否则会报错,其他版本没有测试
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@Component
@RocketMQMessageListener(topic = "test-topic3",
consumerGroup = "test-group3",
consumeMode = ConsumeMode.CONCURRENTLY, // 多线程模式
consumeThreadMax = 10) // 使用10个线程消费
public class ConsumerThread implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("当前线程:"+Thread.currentThread().getName()+",Received message: " + message);
}
}
下面是验证结果,可以看到有10个线程在消费消息(注意这个场景下无法保证消息消费的有序性):
六、RocketMQ的集群
RocketMQ支持多种集群方式:
- 多主多从-主从同步:多个主节点,每个都有从节点,主从同步使用同步方式,速度相对异步慢一点,但集群消息可靠性更高
- 多主多从-主从异步:多个主节点,每个都有从节点,主从同步使用异步方式,速度相对同步快一点,但集群消息可靠性不如同步
- 多主:多个主节点,无从节点,一点主节点挂了就会造成集群不可用,可靠性不如上面两种。
这里展示多主多从-主从同步的方式搭建集群,也就是每个主节点都要有自己的从节点,这个和redis集群模式是很是类似的,当有主节点宕机以后就可以由从节点顶上,这样就有效的防止了集群整体不可用。
还需要说的是主从节点中,只有主节点真正在提供服务,从节点只是一个备份节点,这样的话就会涉及到一个主从的同步机制,这是任何主从模式都需要面临的问题,比如redis、比如mysql都会有这些问题。
1.集群搭建概述
在RocketMQ集群中有三个角色:NameServer 节点,Broker主节点,Broker从节点,理论上2主2从就需要6个服务器,这里使用2台服务器进行模拟搭建,搭建过程和真实6台机器没啥差距,下面是搭建的模型图,我这里使用两台机子的ip分别是:192.168.145.100、192.168.145.101
真实场景部署时记得错开主节点与从节点所在的机器(如上图),这样可以保证单服务宕机不至于出现集群故障,导致集群整个不能用。另外每个主节点都需要向所有的NameServer注册自己的信息。
注:我这里使用了静态ip+关闭了防火墙,静态ip防止后面ip变更,关闭防火墙解决后面端口不放行的问题,需要的话可以看这篇文章:静态ip极简教程
1.MQ安装与配置环境变量
这里使用RocketMQ5.2.0的版本来搭建集群,RocketMQ各个版本搭建没有区别,此外他的包也不区分linux和windows,都是一个包,只是启动时启动脚本的区别而已(因为java的特性,一次编译处处运行)。
这里再贴下下载地址:https://rocketmq.apache.org/zh/download
第一步:先将下载的RocketMQ的zip包上传到服务器,并建立相关文件夹
mkdir /app
cd /app
# 上传文件到当前文件夹下
cp /tmp/VMwareDnD/Kh0Qtx/rocketmq-all-5.2.0-bin-release.zip ./
# 解压zip
unzip -o rocketmq-all-5.2.0-bin-release.zip
# 改名为rocketmq
mv rocketmq-all-5.2.0-bin-release rocketmq
# 删除zip包
rm -rf rocketmq-all-5.2.0-bin-release.zip
# 进入rocketmq
cd ./rocketmq
# 新建文件夹 主节点使用
mkdir -p /app/rocketmq/store
mkdir -p /app/rocketmq/store/index
mkdir -p /app/rocketmq/store/commitlog
mkdir -p /app/rocketmq/store/consumequeue
# 新建文件夹 从节点使用
mkdir -p /app/rocketmq/store-slave
mkdir -p /app/rocketmq/store-slave/index
mkdir -p /app/rocketmq/store-slave/commitlog
mkdir -p /app/rocketmq/store-slave/consumequeue
第二步:更新环境变量
这里默认已经有了java环境了,然后我们需要配置rocket的环境变量,操作如下:
# 打开环境变量配置文件
vim /etc/profile
# 最底部增加一下内容
ROCKETMQ_HOME=/app/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
# 保存退出后重置环境变量
source /etc/profile
# 验证环境变量是否生效,应该输出这个:/app/rocketmq
echo $ROCKETMQ_HOME
2.服务端配置文件详解与配置
上面都处理的没问题以后,可以看下Broker的配置信息了,这个还是很重要的。这里服务端的配置需要详细说下。这里搭建的是2主2从模式,先删除不需要的配置文件,防止后面看乱了。
先cd 到 配置文件位置:
# 这是2主2从-同步的配置文件位置
cd /app/rocketmq/conf/2m-2s-sync
# 删除不需要的文件,保留如下
192.168.145.100为节点a:保留 broker-a.properties 、broker-b-s.properties
192.168.145.101为节点b:保留 broker-b.properties 、broker-a-s.properties
注意下面只是常用的配置,并不是全部,如果想要看全部配置还是得去官方文档上看,而且如果消息量特别大,这些参数都是需要压测以后才可以确定的,下面只是简单介绍了参数的作用。
-
broker-a.properties的
下面是broker-a.properties的配置内容,可以直接将以下内容复制到配置文件中(先清空再复制进去就可以)# 集群名,这里的2主2从必须都是这个名字 brokerClusterName=rocketmq-cluster # broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-a # brokerid,0就表示是Master,>0的都是表示 Slave brokerId=0 # nameServer地址,分号分割 namesrvAddr=192.168.145.100:9876;192.168.145.101:9876 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 发送消息时自动创建Broker不存在的topic的队列数,默认4个 defaultTopicQueueNums=4 # 是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 # 删除日志文件的时间点,默认凌晨四点 deleteWhen=04 # 日志文件保留时间,默认48小时 fileReservedTime=120 # commitLog文件的默认大小1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30M条 ,可根据业务情况调整 mapedFileSizeConsumeQueue=300000 # 服务器物理磁盘阈值,达到报警 diskMaxUsedSpaceRatio=90 # 存储路径 storePathRootDir=/app/rocketmq/store # commitlog存储路径 storePathCommitLog=/app/rocketmq/store/commitlog # consumequeue 存储路径 storePathConsumeQueue=/app/rocketmq/store/consumequeue # 索引存储路径 storePathIndex=/app/rocketmq/store/index # checkpoint 存储路径 storeCheckpoint=/app/rocketmq/store/checkpoint # abort 存储路径 abortFile=/app/rocketmq/store/abort # 消息大小限制 maxMessageSize=65536 # Broker 的角色:SYNC_MASTER 主从同步,ASYNC_MASTER 主从异步, brokerRole=SYNC_MASTER # 刷盘策略:ASYNC_FLUSH 异步刷盘,SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # 指定ip,防止从节点与主节点连接时异常 brokerIP1=192.168.145.100
-
broker-b-s.properties
下面是broker-b-s.properties的配置内容,a主b从不同的配置项是这六个:brokerName、brokerId、listenPort、brokerRole、存储文件(这里是6个存储文件的路径)、brokerIP1# 集群名,这里的2主2从必须都是这个名字 brokerClusterName=rocketmq-cluster # broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-b # brokerid,0就表示是Master,>0的都是表示 Slave brokerId=1 # nameServer地址,分号分割 namesrvAddr=192.168.145.100:9876;192.168.145.101:9876 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 发送消息时自动创建Broker不存在的topic的队列数,默认4个 defaultTopicQueueNums=4 # 是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 # 删除日志文件的时间点,默认凌晨四点 deleteWhen=04 # 日志文件保留时间,默认48小时 fileReservedTime=120 # commitLog文件的默认大小1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30M条 ,可根据业务情况调整 mapedFileSizeConsumeQueue=300000 # 服务器物理磁盘阈值,达到报警 diskMaxUsedSpaceRatio=90 # 存储路径 storePathRootDir=/app/rocketmq/store-slave # commitlog存储路径 storePathCommitLog=/app/rocketmq/store-slave/commitlog # consumequeue 存储路径 storePathConsumeQueue=/app/rocketmq/store-slave/consumequeue # 索引存储路径 storePathIndex=/app/rocketmq/store-slave/index # checkpoint 存储路径 storeCheckpoint=/app/rocketmq/store-slave/checkpoint # abort 存储路径 abortFile=/app/rocketmq/store-slave/abort # 消息大小限制 maxMessageSize=65536 # Broker 的角色:SYNC_MASTER 主从同步,ASYNC_MASTER 主从异步,SLAVE 从节点 brokerRole=SLAVE # 刷盘策略:ASYNC_FLUSH 异步刷盘,SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # 指定ip,防止从节点与主节点连接时异常 brokerIP1=192.168.145.100
-
broker-b.properties
下面是broker-b.properties的配置信息# 集群名,这里的2主2从必须都是这个名字 brokerClusterName=rocketmq-cluster # broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-b # brokerid,0就表示是Master,>0的都是表示 Slave brokerId=0 # nameServer地址,分号分割 namesrvAddr=192.168.145.100:9876;192.168.145.101:9876 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 发送消息时自动创建Broker不存在的topic的队列数,默认4个 defaultTopicQueueNums=4 # 是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 # 删除日志文件的时间点,默认凌晨四点 deleteWhen=04 # 日志文件保留时间,默认48小时 fileReservedTime=120 # commitLog文件的默认大小1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30M条 ,可根据业务情况调整 mapedFileSizeConsumeQueue=300000 # 服务器物理磁盘阈值,达到报警 diskMaxUsedSpaceRatio=90 # 存储路径 storePathRootDir=/app/rocketmq/store # commitlog存储路径 storePathCommitLog=/app/rocketmq/store/commitlog # consumequeue 存储路径 storePathConsumeQueue=/app/rocketmq/store/consumequeue # 索引存储路径 storePathIndex=/app/rocketmq/store/index # checkpoint 存储路径 storeCheckpoint=/app/rocketmq/store/checkpoint # abort 存储路径 abortFile=/app/rocketmq/store/abort # 消息大小限制 maxMessageSize=65536 # Broker 的角色:SYNC_MASTER 主从同步,ASYNC_MASTER 主从异步,SLAVE 从节点 brokerRole=SYNC_MASTER # 刷盘策略:ASYNC_FLUSH 异步刷盘,SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # 指定ip,防止从节点与主节点连接时异常:这是当前服务器ip brokerIP1=192.168.145.101
-
broker-a-s.properties
下面是broker-a-s.properties的配置信息# 集群名,这里的2主2从必须都是这个名字 brokerClusterName=rocketmq-cluster # broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-a # brokerid,0就表示是Master,>0的都是表示 Slave brokerId=1 # nameServer地址,分号分割 namesrvAddr=192.168.145.100:9876;192.168.145.101:9876 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 发送消息时自动创建Broker不存在的topic的队列数,默认4个 defaultTopicQueueNums=4 # 是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 # 删除日志文件的时间点,默认凌晨四点 deleteWhen=04 # 日志文件保留时间,默认48小时 fileReservedTime=120 # commitLog文件的默认大小1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30M条 ,可根据业务情况调整 mapedFileSizeConsumeQueue=300000 # 服务器物理磁盘阈值,达到报警 diskMaxUsedSpaceRatio=90 # 存储路径 storePathRootDir=/app/rocketmq/store-slave # commitlog存储路径 storePathCommitLog=/app/rocketmq/store-slave/commitlog # consumequeue 存储路径 storePathConsumeQueue=/app/rocketmq/store-slave/consumequeue # 索引存储路径 storePathIndex=/app/rocketmq/store-slave/index # checkpoint 存储路径 storeCheckpoint=/app/rocketmq/store-slave/checkpoint # abort 存储路径 abortFile=/app/rocketmq/store-slave/abort # 消息大小限制 maxMessageSize=65536 # Broker 的角色:SYNC_MASTER 主从同步,ASYNC_MASTER 主从异步,SLAVE 从节点 brokerRole=SLAVE # 刷盘策略:ASYNC_FLUSH 异步刷盘,SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # 指定ip,防止从节点与主节点连接时异常 brokerIP1=192.168.145.101
在上面的配置文件在两台机子都配置完成以后,需要略微调下Broker的启动参数,他默认是8G的JVM内存占用,虚拟机显然是不够的,这里调整如下:
vim /app/rocketmq/bin/runbroker.sh
调整如下大小,我这里改为了512、512、512
3.启动集群
第一步启动:两台机子的nameserver服务器都先启动
cd /app/rocketmq/bin
nohup sh mqnamesrv &
启动完成可以使用ps -ef|grep java 验证下是否正常
第二步启动:两台机子的主节点
cd /app/rocketmq/bin
# 第一台的主
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &
# 第二台的主
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &
启动完成可以使用ps -ef|grep java 验证下是否正常
第三步启动:两台机子的从节点
cd /app/rocketmq/bin
# 第一台的从
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
# 第二台的从
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &
4.验证集群
验证集群也简单,只需要我们通过dashboard看下就行,启动dashboard时指定nameserver的地址为这两台的nameserver就行了
然后刷新下面的页面就可以看到集群的节点信息了,如下:
到这里我们的集群配置就完成了。
5.RocketMQ集群下Springboot的配置
搭建完了Broker的集群,那在Springboot中如何使用集群呢,其实使用起来和单机差别不大,因为我们是面向命名服务的,所以只需要配置多个命名服务就行了,下面展示一个常用配置的配置项
server:
port: 8880
rocketmq:
# 命名服务地址集群
name-server: 192.168.145.100:9876;192.168.145.101:9876
producer:
# 生产者组
group: test-group
max-message-size: 4194304
# 重试其他broker,集群模式应该才有用,默认false
retry-next-server: true
# 同步失败重试次数
retry-times-when-send-failed: 3
# 异步失败重试次数
retry-times-when-send-async-failed: 3
# 发送消息超时时间
send-message-timeout: 3000
consumer:
# 默认配置,统一使用集群配置
message-model: CLUSTERING
# 消息过滤统一使用tag
selector-type: TAG
6.验证集群的使用
这里我们往集群中发送1000条消息,看下生成与消费然后监控下console控制台变化,看看是否正常(配置信息看上面一小节,完全一致)。
生产者代码:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@RestController
@RequestMapping("/action")
public class TestCluster {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendCluster")
public String doSend(){
for (int i = 0; i < 1000; i++) {
rocketMQTemplate.convertAndSend("test-topic:tag1", "我是tag消息,用来测试集群的收发消息-"+i);
}
return "send";
}
}
消费代码:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @Author: pcc
* @Description: 一句话描述该类
*/
@Component
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "test-group1",
selectorType = SelectorType.TAG,// 这是默认值
selectorExpression = "tag1 || tag2") // 声明多个tag使用 || 隔开
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("Consumer1收到消息:" + s+",时间:"+ LocalDateTime.now());
}
}
console的结果如下:
由上图可以看到两个主节点的都收到了500条消息,集群的负载是正常的,消费也都没有问题。到这里集群搭建和使用就完全ok了
七、消息中间件的通用问题
这里说下消息中间件都需要考虑的问题
1.如何保证消息的有序性
RocketMQ中每个topic默认有4个队列,四个队列一起接收生产者发送的消息,那么Rocket是如何保证了消费者对于消息消费的顺序性呢?
这个问题在Rocket中需要区分两个概念:全局有序性、分区有序性,Rocket默认每个topic有四个队列,也可以叫4个分区。若是想要全局有序性只有一个办法,那就是只用一个分区,消费者只有一个,如此可以保证全局有序性。不过大部分场景其实不需要保证全局有序性,只需要保证分区有序性就行了。一旦保证了分区有序性我们取消息时配合顺序取就可以实现分区消息的顺序消费了,如此可以保证每个队列内部的有序(队列间无序)。
这里不重复做代码的展示了,代码参见:第四章的8,9两个小节。
2.如何保证消息不丢失-生产者不丢、Broker不丢、消费者不丢
下面是持久化的模型,各个MQ基本都是这一套模型
逻辑比较简单,这里做下介绍:
- 1.生产者发送消息给Broker
- 2.Broker收到后进行持久化到数据库中
- 3.Broker持久化成功通知生产者
- 4.消费者从Broker获取消息
- 5.消费者回调Broker告知消息已消费
- 6.生产者将消息从数据库中删除
- 7.消息重试:生产者重试支持配置同步重试次数默认都是2次、异步重试次数默认都是2次、
Broker重新发送顺序消息默认间隔1s(1s收不到顺序消息的Ack),Broker重新发送无序消息(普通、延时、事务等)默认是10s、30s、1m、2m、3m、4m、5m…2…,总共会重试16次,若是16次以后消费者仍然不能做ack,那么消息将变成死信消息。
消费者重试
这样就保证了消息不会丢失了,生产者和Broker之间主要靠发送的Ack回调,Broker主要靠持久化,消费者和Broker之间也是有一个Ack机制。所以这些共同保证了消息的可靠性。
注意:Rocket实际存储消息的地方是文件系统已经不是数据库了,数据库在消息量特别大时会成为制约MQ速度的瓶颈。
3.RocketMQ如何和文件系统进行高效交互存取消息
- 1.磁盘会预先分配连续空间,支持消息写入的顺序写,顺序写自然很快。
磁盘的预先分配空间的大小,可以通过配置文件的commitlog配置项来更改。 - 2.读取的时候快是因为使用了零拷贝技术,零拷贝技术是跳过了磁盘数据从磁盘转移到内存中的过程中的一些中间状态
使用空间换时间的理念来节省时间的消耗,内存最小要求不低于1G
3.消息存储commitlog、consumerqueue、index
- commitlog
发送到Broker的消息存储在commitlog中,也就是集群中配置文件指定的 " storePathCommitLog=/app/rocketmq/store-slave/commitlog ",这个文件会以指定的大小进行磁盘预占用申请,每次申请的大小通过 " mapedFileSizeCommitLog=1073741824 " 来指定,我们说的topic、queue都可以认为在这个文件中。他是消息持久化的地方。 - consumerqueue
Broker如何确认消费者消息消费到了哪里呢?其实每个queue都会有一个minOffset、一个maxOffset,用以标识消息的最小和最大偏移量。而消费者每次消费的位置都会通过consumerqueue来进行存储,存储时是以topic为单位来存储每个topic中每个queue的消费位置,以方便消费者中断后可以继续找寻到上次消费的位置consumerOffset。 - index
索引主要存储一些元数据,主要是队列、topic创建时间,消费者开始消费时间等等。
4.刷盘机制
RocketMQ支持两种刷盘机制
- 同步刷盘
同步刷盘是一旦有消息到达Broker,Broker会先挂起生产者的线程,先和磁盘交互,只有将消息持久化成功了,才会响应Ack,同步刷盘的优点是数据安全性、可靠性高。但是速度相对于异步刷盘来说会慢一点。 - 异步刷盘
异步刷盘就简单了,Broker收到消息立马响应Ack,此时无法保证消息持久化的可靠性(Ack以后存在丢消息的可能)。然后Broker会批量进行刷盘,这点和Redis的刷盘有些类似。异步刷盘的优点时支持的并发高,速度快,缺点是可靠性没有同步刷盘高。
5.消息积压问题
- 增加消费节点
- 消费节点采用多线程进行消费
- 如果上面措施仍不能解决,可以新启服务来消费存量消息,该支付只做消息消费,保证消费服务的性能
- 切换group的信息读取为从节点,减轻主节点压力,也就是做成读写分离
6.消息消费的幂等问题
RocketMQ自带的messageid不具有完全唯一性,不可以作为幂等消费的判定条件,如果是需要考虑幂等的场景,生产者发送消息时应该将业务主键塞到消息里,消费者最好的做法是根据业务主键来判定消息的幂等问题。下面是几种幂等的判断:
新增:不幂等,只要是新增肯定不幂等
查询:幂等
删除:幂等
更改:可能幂等,可能不幂等,set a= a+10,不幂等。