文章目录
- RocketMQ
- 1 RocketMQ简介
- 2 Rocket安装
- 2.1 Rocket安装(基于Linux)
- 2.2 控制台安装
- 3 Rocket的使用
- 3.1 普通消息发送
- 3.1.1 同步消息发送
- 3.1.2 异步消息发送
- 3.1.3 单向消息发送
- 3.2 普通消息消费
- 3.2.1 集群消费
- 3.2.2 广播消费
- 3.3 收发顺序消息
- 3.3.1 全局顺序生产消费
- 3.2.2 局部顺序生产消费
- 3.4 收发延时消息
- 3.4.1 延时消息生产
- 3.4.2 延时消息消费
- 3.5 收发批量消息
- 3.5.2 批量消息的生产
- 3.5.3 批量消息的消费
- 3.6 收发过滤消息
- 3.6.1 过滤消息的生产
- 3.6.2 过滤消息的消费
- 4 Rocket常用属性和方法
- 4.1 消息生产的属性和方法
- 4.2 消息消费的属性和方法
- 5 RocketMQ高可用机制
- 5.1 集群部署模式
- 5.2 刷盘与主从同步
- 6 RocketMQ 存储结构
- 7 SpringBoot 中使用 RocketMQ
- 7.1 基本使用
- 7.2 demo
RocketMQ
1 RocketMQ简介
为什么要使用MQ?
因为项目比较大,做了分布式系统,所有远程服务调用请求都是同步执行经常出问题,所以引入了mq
作用 | 描述 |
---|---|
解耦 | 系统耦合度降低,没有强依赖关系 |
异步 | 不需要同步执行的远程调用可以有效提高响应时间 |
削峰 | 请求达到峰值后,后端service还可以保持固定消费速率消费,不会被压垮 |
RocketMQ角色
角色 | 作用 |
---|---|
Nameserver | 无状态,动态列表;这也是和zookeeper的重要区别之一。zookeeper是有状态的。 |
Producer | 消息生产者,负责发消息到Broker。 |
Broker | 就是MQ本身,负责收发消息、持久化消息等。 |
Consumer | 消息消费者,负责从Broker上拉取消息进行消费,消费完进行ack。 |
主题(Topic)
同一类消息的标识,例如某宝电商,衣服、手机、咖啡等都是一个主题,一般我们在生产或者消费时都会指定一个主题,要么生产这个主题的消息,要么订阅这个主题的消息。
分组(Group)
可以对生产者或者消费者分组,一般都针对于消费者的分组,分组是一个很有意义的事情,因为消费者都是对某一类消息的消费,消费的逻辑都是一致的,比如是一个订单主题的消息,我们可以有一个物流组来消费它,同时也可以定位一个通知组来消费这个消息。相互之间是隔离的,有可能会消费重复的消息。
消息队列(Message Queue)
是一个容器的概念,同一个主题有可能根据分组的不同,会产生不同的队列以供不同的消费组别进行消费。
标签(Tag)
更加细分的划分,在消费的时候我们可以根据 Tag 的不同来订阅不同标签消息,类似于 MySQL 查询中的条件。
偏移量(Offset)
message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费。
2 Rocket安装
2.1 Rocket安装(基于Linux)
- 官网安装
https://rocketmq.apache.org/download/
- 选择一个版本安装
-
上传解压
unzip rocketmq-all-4.9.5-bin-release.zip
-
启动 NameServer
bin/mqnamesrv
nohup sh ... & 是后台运行 ... 代表需要启动的程序 [root@huozhexiao rocketmq-all-5.0.0-bin-release]# nohup sh mqnamesrv &
-
检查日志
tail -f ~/logs/rocketmqlogs/namesrv.log
-
启动 Broker
-
首先修改 conf 文件夹下的 broker.conf 文件,设置IP。
添加一个 IP ,其中地址是当前虚拟机地址。
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1= 192.168.198.129
-
修改run下的 runbroker.sh 启动文件
-
启动
#-c 是启动时加载配置文件 #-n 是启动IP和端口号 #autoCreateTopicEnable=true 是自动创建主题(可以不写) [root@huozhexiao bin]# nohup sh mqbroker -c ../conf/broker.conf -n 192.168.198.129:9876 autoCreateTopicEnable=true &
-
查看日志
tail -f ~/logs/rocketmqlogs/broker.log
-
停止
Broker :
mqshutdown broker
Namesrv :
mqshutdown namesrv
关于源码下载window版本的可以参照关老师的博客:
https://blog.csdn.net/Dailyblue/article/details/126473743
2.2 控制台安装
-
官网下载安装包
https://github.com/apache/rocketmq-dashboard
-
配置
进入项目文件夹并修改 application.yml或application.properites配置文件(不能包含中文注释)。
server: port: 8087 servlet: encoding: charset: UTF-8 enabled: true force: true spring: application: name: rocketmq-dashboard logging: config: classpath:logback.xml rocketmq: config: # if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876 # configure multiple namesrv addresses to manage multiple different clusters namesrvAddrs: - 127.0.0.1:9876 - 127.0.0.2:9876 # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true isVIPChannel: # timeout for mqadminExt, default 5000ms timeoutMillis: # rocketmq-console's data path:dashboard/monitor dataPath: /tmp/rocketmq-console/data # set it false if you don't want use dashboard.default true enableDashBoardCollect: true # set the message track trace topic if you don't want use the default one msgTrackTopicName: ticketKey: ticket # must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required loginRequired: false useTLS: false # set the accessKey and secretKey if you used acl accessKey: # if version > 4.4.0 secretKey: # if version > 4.4.0 threadpool: config: coreSize: 10 maxSize: 10 keepAliveTime: 3000 queueSize: 5000
-
打包
在项目路径下
mvn clean package -Dmaven.test.skip=true
-
运行控制台
java -jar rocketmq-dashboard-8087.jar
3 Rocket的使用
启动 NameSrv、Broker 和 控制台。
-
引入依赖
<!-- rocketMQ 客户端依赖 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.0.0</version> </dependency>
3.1 普通消息发送
消息发送步骤
- 创建消息生产者 producer,并指定生产者组名
- 指定 NameSrv 地址
- 启动 producer
- 创建消息对象,指定 Topic、Tag 和消息体
- 发送消息
- 关闭生产者 producer
3.1.1 同步消息发送
同步消息即请求立马有响应,但是可能会发生线程阻塞(即上一个消息迟迟没有响应)
@Slf4j
public class 同步消息发送 {
@SneakyThrows
public static void main(String[] args) {
// 1.创建消息生产者 producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("huozhexiao-a");
// 2.指定 NameSrv 地址
producer.setNamesrvAddr("192.168.198.129:9876");
//延时
producer.setSendMsgTimeout(1000000);
// 3.启动 producer
producer.start();
// 4.创建消息对象,指定 Topic、Tag 和消息体
for (int i = 1; i < 20; i++) {
String body = "Hello,rocketMQ,index:" + i;
String topic = "huozhexiao";
String tag = "a";
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
// 5.发送消息
SendResult result = producer.send(message);
log.info("回调的消息是:{}", result);
}
// 6.关闭生产者 producer
producer.shutdown();
}
}
3.1.2 异步消息发送
异步消息发送,即请求不会立即响应,同步异步通过监听的方式来获取。 所以在异步发送时,并不会阻塞当前线程。
@Slf4j
public class 异步消息发送 {
@SneakyThrows
public static void main(String[] args) {
// 1.创建消息生产者 producer,并指定生产者组名
DefaultMQProducer producer =new DefaultMQProducer("huozhexiao-a");
// 2.注册中心地址
producer.setNamesrvAddr("192.168.198.129:9876");
producer.setSendMsgTimeout(1000000);
// 3. 启动
producer.start();
// 4. 消息信息
for (int i = 1; i < 20; i++) {
String body = "你好,宝鸡!索引:" + i;
String topic = "huozhexiao";
String tag = "b";
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
//5.发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
log.info("回调的消息是:{}", result);
}
public void onException(Throwable throwable) {
log.info("失败回调的消息是:{}", throwable.getMessage());
}
});
Thread.sleep(500);
}
//6. 关闭消息
producer.shutdown();
}
}
3.1.3 单向消息发送
单向消息不用管是否有响应,只关心发送
@Slf4j
public class 单向消息发送 {
@SneakyThrows
public static void main(String[] args) {
// 1.创建消息生产者 producer,并指定生产者组名
DefaultMQProducer producer =new DefaultMQProducer("huozhexiao-a");
// 2.注册中心地址
producer.setNamesrvAddr("192.168.198.129:9876");
producer.setSendMsgTimeout(1000000);
// 3. 启动
producer.start();
for (int i = 1; i < 11; i++) {
String body = "你好,宝鸡!索引:" + i;
String topic = "huozhexiao";
String tag = "c";
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
// 唯一的区别,这里调用 sendOneway
producer.sendOneway(message);
}
//关闭生产者
producer.shutdown();
}
}
三者的特点和主要区别如下:
发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 | 适用场景 |
---|---|---|---|---|
同步发送 | 快 | 有 | 不丢失 | 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。 |
异步发送 | 快 | 有 | 不丢失 | 异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。 |
单向发送 | 最快 | 无 | 可能丢失 | 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。 |
3.2 普通消息消费
消息队列是基于发布/订阅模型的消息系统。消费者,即消息的订阅方订阅关注的 Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列约定以下概念:
- 集群:使用相同 Group ID 的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括 Tag 的使用)。
- 集群消费:当使用集群消费模式时,消息队列认为任意一条消息只需要被集群内(单 Zone 内)的任意一个消费者处理即可。
- 广播消费:当使用广播消费模式时,消息队列会将每条消息推送给集群内(单 Zone 内)所有注册过的消费者,保证消息至少被每个消费者消费一次。
消息消费步骤
- 创建消费者 Consumer,指定消费者组名
- 指定 NameSrv 地址
- 订阅主题 Topic 和 Tag
- 设置回调函数,处理消息
- 启动消费者 Consumer
3.2.1 集群消费
适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。
public class 集群消费 {
@SneakyThrows
public static void main(String[] args) {
// 创建消费者 Consumer,指定消费者组名
DefaultMQPushConsumer consumer =new DefaultMQPushConsumer("huozhexiao-b");
// 指定 NameSrv 地址
consumer.setNamesrvAddr("192.168.198.129:9876");
// 订阅主题 Topic 和 Tag
consumer.subscribe("huozhexiao","*");
// 集群消费模式 可以不设置,默认就是集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
// 循环接受所有消息
for (MessageExt message : list) {
// 分别获取 Topic、Tags 和 消息内容
String topic = message.getTopic();
String tags = message.getTags();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + msg);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 代表消息消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 代表消息被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者 Consumer
consumer.start();
}
}
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。
3.2.2 广播消费
适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。
与集群消费不同的是设置广播消费即可
consumer.setMessageModel(MessageModel.BROADCASTING);
public class 广播消费 {
@SneakyThrows
public static void main(String[] args) {
// 创建消费者 Consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huozhexiao-b");
// 指定 NameSrv 地址
consumer.setNamesrvAddr("192.168.198.129:9876");
// 订阅主题 Topic 和 Tag
consumer.subscribe("huozhexiao", "*");
// 集群消费模式 可以不设置,默认就是集群消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
// 循环接受所有消息
for (MessageExt message : list) {
// 分别获取 Topic、Tags 和 消息内容
String topic = message.getTopic();
String tags = message.getTags();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + msg);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 代表消息消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 代表消息被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者 Consumer
consumer.start();
}
}
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 广播模式下不支持线下联调分组消息。
- 每条消息都需要被相同订阅逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
- 广播模式下,消息队列保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 广播模式下服务端不维护消费进度,所以消息队列控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
3.3 收发顺序消息
顺序消息(FIFO 消息)是消息队列提供的一种严格按照顺序来发布和消费的消息类型。可以被分为:
- 全局顺序消息
- 部分顺序消息
顺序消费的原理解析:
-
默认的情况下消息发送会采取 Round Robbin 轮询方式把消息发送到不同的 queue (分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序。
-
但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。
-
当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
3.3.1 全局顺序生产消费
RocketMQ为什么没有办法保证顺序消费
答:①RocketMQ默认有四个分区,轮询把消息发送到四个分区队列,消费时需要从四个分区队列拉取消息
②原来的消费ConsumeConcurrentlyStatus.RECONSUME_LATER
:无法保证消息是否消费成功,需要重试
-
全局顺序生产
@Slf4j public class 全局顺序生产 { @SneakyThrows public static void main(String[] args) { // 1.创建消息生产者 producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("huozhexiao-a"); // 2.指定 NameSrv 地址 producer.setNamesrvAddr("192.168.198.129:9876"); //延时 producer.setSendMsgTimeout(1000000); // 3.启动 producer producer.start(); // 4.创建消息对象,指定 Topic、Tag 和消息体 for (int i = 1; i < 20; i++) { String body = "Hello,rocketMQ,index:" + i; String topic = "huozhexiao"; String tag = "a"; Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8)); // 5.发送消息 producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { //代表只发送到一个队列中 return list.get(0); } },null); } // 6.关闭生产者 producer producer.shutdown(); } }
-
全局顺序消费
@Slf4j public class 全局顺序消费 { @SneakyThrows public static void main(String[] args) { //创建消费者 Consumer,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huozhexiao-a"); //指定 NameSrv 地址 consumer.setNamesrvAddr("192.168.198.129:9876"); //订阅主题 Topic 和 Tag consumer.subscribe("huozhexiao", "a"); // 设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { // 接收消费(消费消息,订阅消息) try { list.forEach((me) -> { String topic = me.getTopic(); String tag = me.getTags(); String body = new String(me.getBody()); log.info("topic:{},tag:{},body:{}", topic, tag, body); }); } catch (Exception e) { // 需要注意:这里使用这个标注失败,意思是先等一会,一会在处理这批消息,而不是放到重试队列中 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); // 启动消费者 Consumer consumer.start(); } }
3.2.2 局部顺序生产消费
-
局部顺序生产(订单案例)
假设五个分区有五个状态:分别为已付款,未付款,已发货,未发货,已完成
- bean的Order类
@Data @AllArgsConstructor @NoArgsConstructor public class Order implements Serializable { private Integer id; private String name; private Integer type; //1:未付款 2:已付款 3:未发货 4:已发货 5:已完成 }
- 局部顺序生产
@Slf4j public class 局部顺序生产 { @SneakyThrows public static void main(String[] args) { // 1.创建消息生产者 producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("huozhexiao-a"); // 2.指定 NameSrv 地址 producer.setNamesrvAddr("192.168.198.129:9876"); producer.setSendMsgTimeout(10000000); // 3.启动 producer producer.start(); // 4.创建消息对象,指定 Topic、Tag 和消息体 for (int i = 1; i < 21; i++) { Order order = new Order(); order.setId(i); order.setName("订单:" + i); order.setType((int) (Math.random() * 4 + 1)); String body = JSONArray.toJSONString(order); String topic = "huozhexiao"; String tag = "a"; Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8)); // 5.发送消息 producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { // 将消息发送到第一个队列中 return list.get(order.getType() - 1); } }, null); } // 6.关闭生产者 producer producer.shutdown(); } }
-
局部顺序消费
@Slf4j public class 局部顺序消费 { @SneakyThrows public static void main(String[] args) { //创建消费者 Consumer,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huozhexiao-a"); //指定 NameSrv 地址 consumer.setNamesrvAddr("192.168.198.129:9876"); //订阅主题 Topic 和 Tag consumer.subscribe("huozhexiao", "a"); // 设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { // 接收消费(消费消息,订阅消息) try { list.forEach((me) -> { String topic = me.getTopic(); String tag = me.getTags(); String body = new String(me.getBody()); Order order = JSONArray.parseObject(body, Order.class); if (order.getType() == 1) { log.info("{}",order); } }); } catch (Exception e) { return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); // 启动消费者 Consumer consumer.start(); } }
3.4 收发延时消息
在电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
3.4.1 延时消息生产
@Slf4j
public class 延时消息生产 {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("huozhexiao-a");
producer.setNamesrvAddr("192.168.198.129:9876");
producer.setSendMsgTimeout(100000);
producer.start();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
for (int i = 1; i < 20; i++) {
String body = "Hello,rocketMQ,index:" + i + "," + System.currentTimeMillis();
String topic = "huozhexiao";
String tag = "a";
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
message.setDelayTimeSec(10);
// 5.发送消息
SendResult result = producer.send(message);
log.info("回调的消息是:{}", result);
}
producer.shutdown();
}
}
3.4.2 延时消息消费
@Slf4j
public class 延时消息生产 {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("huozhexiao-a");
producer.setNamesrvAddr("192.168.198.129:9876");
producer.setSendMsgTimeout(100000);
producer.start();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
for (int i = 1; i < 20; i++) {
String body = "Hello,rocketMQ,index:" + i + "," + System.currentTimeMillis();
String topic = "huozhexiao";
String tag = "a";
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
message.setDelayTimeSec(10);
// 5.发送消息
SendResult result = producer.send(message);
log.info("回调的消息是:{}", result);
}
producer.shutdown();
}
}
3.5 收发批量消息
- 批量发送消息能显著提高传递小消息的性能。
- 但是同一批次的消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。
- 此外,这一批消息的总大小不应超过1MB。
waitStoreMsgOK:表示发送消息后,是否需要等待消息同步刷新到磁盘上。如果broker配置为ASYNC_MASTER,那么只需要消息在master上刷新到磁盘即可;如果配置为SYNC_MASTER,那么还需要等待slave也刷新到磁盘。需要注意的是,waitStoreMsgOK默认为false,只有将设置为true的情况下,才会等待刷盘成功再返回。
3.5.2 批量消息的生产
@Slf4j
public class 批量消息生产 {
@SneakyThrows
public static void main(String[] args) {
// 1.创建消息生产者 producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("huozhexiao-a");
// 2.指定 NameSrv 地址
producer.setNamesrvAddr("192.168.198.129:9876");
// 3.启动 producer
producer.start();
// 4.创建消息对象,指定 Topic、Tag 和消息体
List<Message> list = new ArrayList<>();
for (int i = 1; i < 20; i++) {
String body = "Hello,rocketMQ,index:" + i + "," + System.currentTimeMillis();
String topic = "huozhexiao";
String tag = "a";
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
list.add(message);
}
// 批量消息生产
producer.send(list);
// 6.关闭生产者 producer
producer.shutdown();
}
}
3.5.3 批量消息的消费
public class 批量消息消费 {
@SneakyThrows
public static void main(String[] args) {
// 创建消费者 Consumer,指定消费者组名
DefaultMQPushConsumer consumer =new DefaultMQPushConsumer("huozhexiao-b");
// 指定 NameSrv 地址
consumer.setNamesrvAddr("192.168.198.129:9876");
// 订阅主题 Topic 和 Tag
consumer.subscribe("huozhexiao","*");
// 集群消费模式 可以不设置,默认就是集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
// 循环接受所有消息
for (MessageExt message : list) {
// 分别获取 Topic、Tags 和 消息内容
String topic = message.getTopic();
String tags = message.getTags();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("topic:" + topic + ",tags:" + tags + ",msg:" + msg);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 代表消息消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 代表消息被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者 Consumer
consumer.start();
}
}
3.6 收发过滤消息
利用TAG过滤消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huozhexiao-a");
consumer.subscribe("GuanWei", "TagA || TagB || TagC");
SQL 特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。
SQL 基本语法
-
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
数值比较,比如:>,>=,<,<=,BETWEEN,=。
字符比较,比如:=,<>,IN。
IS NULL 或者 IS NOT NULL。
逻辑符号 AND,OR,NOT。 -
常量支持类型为:
数值,比如:123,3.1415。
字符,比如:‘abc’,必须用单引号包裹起来。
NULL,特殊的常量。
布尔值,TRUE 或 FALSE。
3.6.1 过滤消息的生产
@Slf4j
public class 过滤消息生产 {
@SneakyThrows
public static void main(String[] args) {
// 1.创建消息生产者 producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("huozhexiao-a");
// 2.指定 NameSrv 地址
producer.setNamesrvAddr("192.168.198.129:9876");
// 3.启动 producer
producer.start();
// 4.创建消息对象,指定 Topic、Tag 和消息体
for (int i = 1; i < 20; i++) {
int random = (int) (Math.random() * 100 + 1);
String gender = (random % 2 == 0 ? "男" : "女");
String body = "Hello,rocketMQ,index:" + i + "," + random + "," + gender;
String topic = "huozhexiao";
String tag = "a";
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
// 设置过滤条件
message.putUserProperty("random", String.valueOf(random));
message.putUserProperty("gender", gender);
// 5.发送消息
SendResult result = producer.send(message);
log.info("回调的消息是:{}", result);
}
// 6.关闭生产者 producer
producer.shutdown();
}
}
3.6.2 过滤消息的消费
用 MessageSelector.bySql 方法来使用 sql 筛选消息。
@Slf4j
public class 过滤消息消费 {
@SneakyThrows
public static void main(String[] args) {
//创建消费者 Consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huozhexiao-b");
//指定 NameSrv 地址
consumer.setNamesrvAddr("192.168.198.129:9876");
//订阅主题 Topic 和 Tag
consumer.subscribe("huozhexiao", MessageSelector.bySql("random>50 and gender<>'男'"));
// 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 接收消费(消费消息,订阅消息)
list.forEach((me) -> {
String topic = me.getTopic();
String tag = me.getTags();
String body = new String(me.getBody());
log.info("topic:{},tag:{},body:{}", topic, tag, body);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者 Consumer
consumer.start();
}
}
4 Rocket常用属性和方法
4.1 消息生产的属性和方法
属性
//1.设置生产者组
DefaultMQProducer producer = new DefaultMQProducer( producerGroup:"huozhexiao-a");
//2.设置指定主题中每一个Broker 分区队列的数量,默认为4(仅对新主题起作用)
producer.setDefaultTopicQueueNums(4);
//3.设置发送消息默认超时时间,默认3s
producer.setSendMsgTimeout(3000);
//4.设置消息体超过该值时启用压缩,默认4K
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
//5.设置同步发送消息失败后重试次数,默认2次,总共发送3次
producer.setRetryTimesWhenSendFailed(2);
//6.设置异步发送消息失败后重试次数,默认2次,总共发送3次
producer.setRetryTimesWhenSendAsyncFailed(2);
// 7.设置消息重试时,选择另一个Broker中,默认是false
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
//8.设置允许发送的最大消息长度,默认是4M
producer.setMaxMessageSize(1024 * 1024 * 4);
//9.设置NameSrv的地址,多个使用;隔开
producer.setNamesrvAddr("192.168.198.129:9876;192.168.198.129:9877");
方法
- 单向发送方法
//1.启动生产者
producer.start();
//2.关闭生产者
producer.shutdown();
//3.查找该主题下的所有消息队列
List<MessageQueue> list = producer.fetchPublishMessageQueues( topic: "huozhexiao");
//4.单向发送
//4.1发送单向消息
producer.sendOneway(message);
//4.2指定队列发送单向消息
producer.sendOneway(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//这个index就是mqs队列中的索引号int index = 1;
//指定发送到第二个队列中
return mqs.get(index);
}
},null)
//4.3另一种指定队列发送方式(list是查找出来的队列)
producer.sendOneway(message, list.get(1));
- 同步发送方法
//5.同步发送
//5.1发送同步消息
SendResult result1 = producer.send(message);
// 5.2 设置超时时间同属性设置中的 setSendMsgTimeout 默认3000毫秒
SendResult result2 = producer.send(message, timeout: 3000);
//5.3指定队列发送同步消息
SendResult result3 = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//这个index就是mqs队列中的索引号int index = 1;
//指定发送到第二个队列中
return mqs.get(index);
}, null);
// 5.4另一种指定队列发送方式(list是查找出来的队列)
producer.send(message, list.get(1));CSDN @Dailyblue
- 异步发送方法
//6.异步发送
//6.1异步发送消息
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {}
public void onException(Throwable e) {}
});
//6.2设置超时时间默认3000毫秒
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult){}
public void onException(Throwable e) {}
},timeout: 3000);
//6.3指定队列发送异步消息
producer.send(message, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
}, new SendCallback(){
public void onSuccess(SendResult sendResult) {}
public void onException(Throwable e) {}
});
//6.4另一种指定队列发送方式
producer.send(message, list.get(0), new SendCallback() {
public void onSuccess(SendResult sendResult) {}
public void onException(Throwable e) {}
});
4.2 消息消费的属性和方法
属性
//常见属性设置
//1.设置消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("huozhexiao-a");
// 2.指定NameSrvc
onsumer.setNamesrvAddr("192.168.147.88:9876");
// 3.订阅主题和Tags(也可以书写SQL92)
consumer.subscribe("huozhexiao","TagA || TagB");
//4.设置消费模式(集群CLUSTERING|广播BROADCASTING)默认集群
consumer.setMessageModel(MessageModel.CLUSTERING);
//5.设置消费开始偏移量(上次消费偏移量、最大偏移量、最小偏移量和启动时间戳)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//6.设置消费者最小线程数量默认20
consumer.setConsumeThreadMax(20);
//7.设置消费者最大线程数量默认20
consumer.setConsumeThreadMin(20);
// 8.设置每次从 Broker 拉取消息的间隔,单位为毫秒
consumer.setPullInterval(10000);
//9.设置每次从 Broker 拉取消息的条数默认32条
consumer.setPullBatchSize(32);
//10.设置消息重试次数-1代表16次
consumer.setMaxReconsumeTimes(-1);
//11.设置消息超时时间默认15MIN单位MIN
consumer.setConsumeTimeout(15);
方法
- 订阅相关方法
//1.获取消费者对指定主题分配了那些队列
Set<MessageQueue> set = consumer.fetchSubscribeMessageQueues( topic: "huozhexiao");
//2.订阅
//2.1基于主题的订阅,消息过滤使用表达式(TagA|TagB|TagC)
consumer.subscribe("huozhexiao","*"); // subExpression: "TagA"、 "TagA|TagB"
//2.2基于主题的定于,使用SQL
consumer.subscribe("huozhexiao", MessageSelector.bySql("age between 18 and 40"));
// 2.3基于主题的定于,消息过滤使用表达式
consumer.subscribe(topic: "huozhexiao", MessageSelector.byTag("TagA|TagB"));
//2.4取消订阅指定主题
consumer.unsubscribe( topic:"huozhexiao");
-
注册监听器相关方法
-
并发事件监听器
//3.注册监听器
//3.1注册并发事件监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
//msgs:消息集合
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
//获取消息集合并遍历
for (MessageExt msg : msgs) {
//3.1.1获取消息主题
String topic = msg.getTopic();
//3.1.2获取消息Tags
String tags = msg.getTags();
//3.1.3获取消息内容
byte[] bytes = msg.getBody();
//3.1.4获取队列ID
int queueId = msg.getQueueId();
//3.1.5获取消费消息时间
Long storeTime = msg.getStoreTimestamp();
// 3.1.6 获取属性名SQL
String propertyName = msg.getUserProperty( name: "propertyName");
}
} catch (Exception e) {
e.printStackTrace();
//3.1.7消费消息失败,到重试队列中重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//3.1.8成功消费消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
- 顺序事件监听器
//3.2注册顺序事件监听器基本方法都一致
consumer.registerMessageListener(new MessageListenerOrderLy() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
try {
//msgs:消息集合
//获取消息集合并遍历
for (MessageExt msg:msgs){
//3.2.1获取消息主题
String topic = msg.getTopic();
//3.2.2获取消息Tags
String tags = msg.getTags();
//3.2.3获取消息内容
byte[] bytes = msg.getBody();
//3.2.4获取队列ID
int queueId = msg.getQueueId();
//3.2.5获取消费消息时间
Long storeTime = msg.getStoreTimestamp();
//3.2.6获取属性名
String propertyName = msg.getUserProperty( name: "propertyName");
}
} catch (Exception e) {
e.printStackTrace();
//3.2.7消费消息失败,请稍等,过会继续消费不会进入重试队列
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
//3.2.8成功消费消息return ConsumeOrderlyStatus.SUCCESS;
}
});
RocketMQ 的 ACK(Acknowledgement) 机制
ACK,即一种消息确认字符,在数据通信中,消息接受站给消息发送站的一种传输类控制符,表示传输过来的数据已经接受无误。
PushConsumer 为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ 才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才会认为这批消息(默认是1条)是消费完成的。
如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,RocketMQ 就会认为这批消息消费失败了。
为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker,在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。
5 RocketMQ高可用机制
5.1 集群部署模式
-
单 master 模式
单机模式, 即只有一个Broker, 如果Broker宕机了, 会导致RocketMQ服务不可用, 不推荐使用
-
多 master 模式
组成一个集群, 集群每个节点都是Master节点, 配置简单, 性能也是最高, 某节点宕机重启不会影响RocketMQ服务
缺点:如果某个节点宕机了, 会导致该节点存在未被消费的消息在节点恢复之前不能被消费
-
多 master 多 slave 模式(同步)
与多Master多Slave模式,异步复制方式基本一致,唯一不同的是消息复制采用同步方式,只有master和slave都写成功以后,才会向客户端返回成功
优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点就:是会降低消息写入的效率,并影响系统的吞吐量
-
多 master 多 slave 模式(异步)
每个Master配置一个Slave, 多对Master-Slave, Master与Slave消息采用异步复制方式, 主从消息一致只会有毫秒级的延迟
优点:是弥补了多Master模式(无slave)下节点宕机后在恢复前不可订阅的问题。在Master宕机后, 消费者还可以从Slave节点进行消费。采用异步模式复制,提升了一定的吞吐量。总结一句就是,采用多Master多Slave模式,异步复制模式进行部署,系统将会有较低的延迟和较高的吞吐量
缺点就是如果Master宕机, 磁盘损坏的情况下, 如果没有及时将消息复制到Slave, 会导致有少量消息丢失
为什么会有主从节点?
答:master和master之间的集群,集群的是不同种类的消息,但是当一个master不可用(宕机)时,这个master存放的数据也不可用,会对生产造成极大危害,所以产生了从从节点,用来备份主节点的内容
RocketMQ 分布式集群是通过 Master 和 Slave 的配合达到高可用性的。
- Master 和 Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为0表明这个 Broker 是 Master,大于0表明这个 Broker 是 Slave。
- Master 角色的 Broker 支持读和写,Slave 角色的Broker仅支持读,也就是 Producer 只能和 Master 角色的 Broker 连接写入消息;Consumer 可以连接 Master 角色的 Broker,也可以连接 Slave 角色的 Broker 来读取消息。
消息消费高可用
在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候,Consumer 会被自动切换到从 Slave 读。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 程序。这就达到了消费端的高可用性。
消息发送高可用
在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上,这样当一个 Broker 组的 Master 不可用后,其他组的 Master 仍然可用,Producer 仍然可以发送消息。 RocketMQ 目前还不支持把 Slave 自动转成 Master,如果机器资源不足, 需要把 Slave 转成 Master,则要手动停止 Slave 角色的 Broker,更改配置文件,用新的配置文件启动 Broker。
5.2 刷盘与主从同步
- 同步刷盘和异步刷盘
RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时候,有两种写磁盘方式:
异步刷盘方式:
在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入。
- 优点:性能高
- 缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致
同步刷盘方式:
在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。
优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致
缺点:性能比异步的低
同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH, ASYNC_FLUSH中的一个。
- 同步复制和异步复制
同步复制方式是等 Master 和 Slave 均写成功后才反馈给客户端写成功状态,在同步复制方式下,如果 Master 出故障, Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
异步复制方式是只要 Master 写成功 即可反馈给客户端写成功状态,在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果 Master 出了故障,有些数据因为没有被写入 Slave,有可能会丢失。
同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、 SYNC_MASTER、SLAVE(从节点配置)三个值中的一个。
- 三个值的说明:
SYNC_MASTER
是同步方式,Master角色 Broker 中的消息要立刻同步过去。
ASYNC_MASTER
是异步方式,Master 角色 Broker 中的消息通过异步处理的方式同步到 Slave 角色的机器上。
SLAVE
表明当前是从节点,无需配置 brokerRole。
6 RocketMQ 存储结构
-
Commit log存储消息实体。顺序写,随机读。
为什么要顺序写,随机读?
[ 磁盘存储的“快”——顺序写 ]
磁盘存储,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度,目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度。[ 磁盘存储的“慢”——随机写 ]
磁盘的随机写的速度只有100KB/s,和顺序写的性能差了好几个数量级。[ 存储机制这样设计的好处——顺序写,随机读 ]
- CommitLog 顺序写,可以大大提高写入的效率;虽然是随机读,但是利用 package 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。
- 为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构,因为 ConsumeQueue 里只存储偏移量信息,所以尺寸是有限的。
在实际情况中,大部分 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。
-
ConsumeQueue 文件
ConsumeQueue (逻辑消费队列)可以看成基于 topic 的 commitLog 的索引文件。因 > 为 CommitLog 是按照顺序写入的,不同的 topic 消息都会混淆在一起,而 Consumer 又是按照 topic 来消费消息的,这样的话势必会去遍历 commitLog 文件来过滤 topic,这样性能肯定会非常差,所以 RocketMQ 采用 ConsumeQueue 来提高消费性能。即每个 Topic 下的每个 queueId 对应一个 ConsumeQueue,其中存储了单条消息对应在 commitLog 文件中的物理偏移量 offset,消息大小 size,消息 Tag 的 hash 值。
-
IndexFile 文件
因为所有的消息都存在 CommitLog 中,如果要实现根据 key 查询 消息的方法,就会变得非常困难,所以为了解决这种业务需求,有了 IndexFile 的存在。用于为生成的索引文件提供访问服务,通过消息 Key 值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为400M,一个 IndexFile 可以保存 2000W个索引。
7 SpringBoot 中使用 RocketMQ
7.1 基本使用
- 分别引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
- 配置文件
# RocketMQ 相关配置
rocketmq:
# 指定 nameServer
name-server: 192.168.198.129:9876
# Producer 生产者
producer:
group: huozhexiao-a # 指定发送者组名
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
- RocketMQTemplate的使用
private RocketMQTemplate rocketMQTemplate;
/**
* 普通字符串消息
*/
public void sendMessage() {
String json = "普通消息";
rocketMQTemplate.convertAndSend("sendMessage", json);
}
/**
* 同步消息
*/
public void syncSend() {
Message message1 = new Message();
message1.setTopic("GUANWEI");
message1.setTags("TagA");
message1.setBody(message.getBytes(StandardCharsets.UTF_8));
SendResult sendMessage = rocketMQTemplate.syncSend("sendMessage", message);
System.out.println(sendMessage);
}
/**
* 异步消息
*/
public void asyncSend() {
Message message1 = new Message();
message1.setTopic("GUANWEI");
message1.setTags("TagA");
message1.setBody(message.getBytes(StandardCharsets.UTF_8));
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("123");
}
@Override
public void onException(Throwable throwable) {
System.out.println("456");
}
};
rocketMQTemplate.asyncSend("sendMessage", message, callback);
}
/**
* 单向消息
*/
public void onewaySend() {
Message message1 = new Message();
message1.setTopic("GUANWEI");
message1.setTags("TagA");
message1.setBody(message.getBytes(StandardCharsets.UTF_8));
rocketMQTemplate.sendOneWay("sendMessage", message);
}
// ------------------------------------------------------------------------------
// 另一种写法
/**
* 同步消息
*/
public void syncSend() {
SendResult sendMessage = rocketMQTemplate.syncSend("topic:tag", "同步消息");
System.out.println(sendMessage);
}
/**
* 异步消息
*/
public void asyncSend() {
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("123");
}
@Override
public void onException(Throwable throwable) {
System.out.println("456");
}
};
rocketMQTemplate.asyncSend("topic:tag", "异步消息", callback);
}
/**
* 单向消息
*/
public void onewaySend() {
rocketMQTemplate.sendOneWay("topic:tag", "单向消息");
}
- @RocketMQMessageListener 监听器
1. consumerGroup 消费者分组
2. topic 主题
3. selectorType 消息选择器类型
默认值 SelectorType.TAG 根据TAG选择
仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息
SelectorType.SQL92 根据SQL92表达式选择
关键字:
AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL
数据类型:
Boolean, like: TRUE, FALSE
String, like: ‘abc’
Decimal, like: 123
Float number, like: 3.1415
语法:
AND, OR
>, >=, <, <=, =
BETWEEN A AND B, equals to >=A AND <=B
NOT BETWEEN A AND B, equals to >B OR <A
IN ('a', 'b'), equals to ='a' OR ='b', this operation only support String type.
IS NULL, IS NOT NULL, check parameter whether is null, or not.
=TRUE, =FALSE, check parameter whether is true, or false.
样例:
(a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
1
4. selectorExpression 选择器表达式
默认值 ”*“
5. consumeMode 消费模式
默认值 ConsumeMode.CONCURRENTLY 并行处理
ConsumeMode.ORDERLY 按顺序处理
6. messageModel 消息模型
默认值 MessageModel.CLUSTERING 集群
MessageModel.BROADCASTING 广播
7. consumeThreadMax 最大线程数
默认值 64
8. consumeTimeout 超时时间
默认值 30000ms
9. accessKey
默认值 ${rocketmq.consumer.access-key:}
10. secretKey
默认值 ${rocketmq.consumer.secret-key:}
11. enableMsgTrace 启用消息轨迹
默认值 true
12. customizedTraceTopic 自定义的消息轨迹主题
默认值 ${rocketmq.consumer.customized-trace-topic:}
没有配置此配置项则使用默认的主题
13. nameServer 命名服务器地址
默认值 ${rocketmq.name-server:}
14. accessChannel
默认值 ${rocketmq.access-channel:}
7.2 demo
本案例用到,订单,物流,库存三个服务,库存减少时生产订单并生成物流信息…
-
分别引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency>
-
配置文件
# RocketMQ 相关配置 rocketmq: # 指定 nameServer name-server: 192.168.198.129:9876 # Producer 生产者 producer: group: huozhexiao-a # 指定发送者组名 send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。 compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。 retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。 retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
-
订单服务
本服务用于生成订单…
- Controller层
@RestController @RequestMapping("/order") public class OrderController { @Resource private RocketMQTemplate rocketMQTemplate; @GetMapping public JSONResult a() { Order order = new Order(); order.setId(1); order.setName("饮料"); order.setPrice(18188.0); order.setNum(4); //同步消息 topic:tag rocketMQTemplate.syncSend("ORDER:add", JSONArray.toJSONString(order)); return new JSONResult<>(true, 200, order, "订单加入成功!!!"); } }
-
仓库服务
本服务用于监听订单情况,以扣除库存
- Listener(监听订单)
/** * 生成订单后,根据用户确认,扣除库存 * + 用于监听订单生成后,扣减库存 */ @Component @Slf4j @RocketMQMessageListener(topic = "ORDER",consumerGroup = "huozhexiao-b",messageModel = MessageModel.BROADCASTING) public class OrderListener implements RocketMQListener<String> { @Override public void onMessage(String s) { Stock stock =new Stock(); log.info("订单消息变更,s:{}",s); log.info("库存数量为,num:{}",stock.getStockNum()); Order order = JSONArray.parseObject(s, Order.class); stock.setStockNum(stock.getStockNum() - order.getNum()); log.info("order:{}",order); log.info("扣减后库存数量为,num:{}",stock.getStockNum()); } }
-
物流服务
生成物流
@Component @Slf4j @RocketMQMessageListener(topic = "ORDER",consumerGroup = "huozhexiao-b",messageModel = MessageModel.BROADCASTING) public class OrderListener implements RocketMQListener<String> { @Override public void onMessage(String s) { Order order = JSONArray.parseObject(s,Order.class); log.info("生成订单信息:s{}",order); } }
问题:库存不足怎么办?
方案1:
事务回滚,告诉用户,库存不足,金钱原路返回
方案2:
用户提交订单时,不急于让用户付款,利用收发消息延时机制,用户付款时等待30s-1min检测库存是否不充足,若不充足,订单生成失败…
- 学习来自于西安加中实训
- 参考博客:RocketMQ 概述