8.18日学习打卡
目录:
- 8.18日学习打卡
- RocketMQ
- 什么是RocketMQ
- 生产者和消费者
- 技术架构
- RocketMQ安装与配置
- 环境搭建与测试
- RocketMQ管理命令
- RocketMQ发送消息
- 普通消息
- 顺序消息之全局消息
- 顺序消息之局部消息
- 消费者消费消息
- 延迟消息
- 延迟消息代码实现
- 单向消息
- 批量消息
- 过滤消息
- 过滤消息之Tag过滤
- 集群消费
- 广播消息
- 可视化配置
- 消息存储
- 消息查询
- SpringBoot接入RocketMQ生产者
- SpringBoot接入RocketMQ消费者
RocketMQ
什么是RocketMQ
RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
应用场景
RocketMQ在阿里集团也被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
所拥有的功能
- 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点;
- Producer、Consumer、队列都可以分布式;
- Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;
- 能够保证严格的消息顺序;
- 提供丰富的消息拉取模式;
- 高效的订阅者水平扩展能力;
- 实时的消息订阅机制;
- 亿级消息堆积能力;
- 较少的依赖。
生产者和消费者
生产者负责生产消息,一般由业务系统负责生产消息,消费者即后台系统,它负责消费消息。
消息模型(Message Model)
消息模型主要有队列模型和发布订阅模型,RabbitMQ采用的是队列模型,如下图所示
RocketMQ采用发布订阅模型,模型如图所示:
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
队列
存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。
技术架构
RocketMQ架构上主要分为四部分,如上图所示:
NameServer
NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。
主要包括两个功能:
- Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。
路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。Producer和Conumser通过* NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。
1、路由发现
2、路由剔除
3、路由注册
4、客户端选择NameServer的策略
Broker
Broker充当着消息的中转角色,负责存储消息、转发消息。Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括 消费者组消费进度偏移offset、主题、队列等。
Producer
消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列(先选择Broker,再选择队列)进行消息投递,投递的过程支持快速失败并且低延迟。
Consumer
消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。
RocketMQ中的消息消费者都是以消费者组(ConsumerGroup)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是相同Topic类型(可以是一个,可以是多个,但是要相同)、并且是相同的tag(可以是一个,可以是多个,但是要相同)(保证订阅关系的一致性)。消费者组使得在消息消费方面,容易实现
RocketMQ安装与配置
环境搭建与测试
下载安装Apache RocketMQ
修改环境变量
打开/etc/profile文件
vim /etc/profile
# 文件末尾追加改信息
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin
# 生效环境变量
source /etc/profile
解压文件
unzip rocketmq-all-5.1.4-bin-release.zip
启动NameServer
nohup sh mqnamesrv &
启动broker
nohup sh ./mqbroker -n localhost:9876 &
问题解决
1、java.lang.IllegalAccessError: class org.apache.rocketmq.common.UtilAll (in unnamed module @0x58acb723) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module
解决:添加启动参数
修改runbroker文件,添加红色参数 $JAVA ${JAVA_OPT} --add-exports=java.base/sun.nio.ch=ALL-UNNAMED $@
2、堆空间初始值太大也报错
修改文件runbroker.sh
通过上述修改,将初始堆内存512M,最大堆内存设置为512M,新生代(Java中用于存储创建对象的部分)设置为256M,修改完成后便可以正常启动以及查看日志。
RocketMQ管理命令
Topic命令
1、updateTopic
作用:修改或创建一个Topic
命令:mqadmin updateTopic -b | -c [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ]
参数:
-n: name server地址列表
-c: cluster 名称,表示topic 建在该集群
-t: 设置topic名称
-h: 打印help信息
-o: 设置topic是否为有序的 取值:true、false(默认)
-p: 设置topic的权限
示例:
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic
2、deleteTopic
作用:从broker和nameserver删除topic
命令:mqadmin deleteTopic -c [-h] [-n ] -t
参数:
-n: name server地址列表
-c: cluster 名称,表示topic 建在该集群
-t: 设置topic名称
-h: 打印help信息
示例:
mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t testtopic
3、topicList
作用:从nameserver列出所有topic
命令:mqadmin topicList [-c] [-h] [-n ]
参数:
-n: name server地址列表
-c: cluster 名称,表示topic 建在该集群
-h: 打印help信息
示例:
mqadmin topicList -n localhost:9876
4、topicStatus
作用:检查topic的状态信息
命令:mqadmin topicStatus [-h] [-n ] -t
参数:
-n: name server地址列表
-c: cluster 名称,表示topic 建在该集群
-t: 设置topic名称
-h: 打印help信息
示例
mqadmin topicStatus -n localhost:9876 -t testtopic
5、cleanUnusedTopic
作用:清理未使用的topic
命令:mqadmin cleanUnusedTopic [-b ] [-c ] [-h] [-n ]
参数:
-n: name server地址列表
-b: broker地址
-c: 集群名称
-h: 打印help信息
关闭namesrv和broker服务
mqshutdown namesrv
mqshutdown broker
RocketMQ发送消息
普通消息
创建jjy-rocketmq工程
添加依赖
在pom.xml文件中添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
配置文件
spring:
application:
# 应用名字
name: springboot_rocketmq_producer
测试类
package com.jjy.sendmessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* 1、发送普通消息
* 普通消息,也叫并发消息,是发送效率最高、使用场景最多的一类消息。
*/
@Slf4j
public class SimpleTest {
@Test
public void producer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("192.168.47.100:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("TestTopic", "Tags", (i + "_syncProducer").getBytes(StandardCharsets.UTF_8));
SendResult send = producer.send(message);
System.out.println(i + "消息发送成功:" + send);
}
producer.shutdown();
}
@Test
public void consumer() throws MQClientException, InterruptedException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simpleconsmer");
consumer.setNamesrvAddr("192.168.47.100:9876");
consumer.subscribe("TestTopic","*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n -> {
System.out.println("消费成功" + n);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Thread.sleep(Long.MAX_VALUE);
}
}
顺序消息之全局消息
什么是顺序消息
消息有序指的是,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费(FIFO)。
举个容易理解的例子:
通常创建订单后,会经历一系列的操作:【订单创建 -> 订单支付 -> 订单发货 -> 订单配送 ->
订单完成】。在创建完订单后,会发送五条消息到MQ Broker中,消费的时候要按照【订单创建 -> 订单支付 -> 订单发货 -> 订单配送
-> 订单完成】这个顺序去消费,这样的订单才是有效的。
顺序消息的原理
在默认的情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
全局顺序消息
全局顺序消息的话,我们需要将所有消息都发送到同一个队列,然后消费者端也订阅同一个队列,这样就能实现顺序消费消息的功能。下面通过一个示例说明如何实现全局顺序消息。
顺序消息之局部消息
生产者发送消息
public class OrderMQProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {
// 创建DefaultMQProducer类并设定生产者名称
DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqProducer.setNamesrvAddr("10.0.90.86:9876");
// 启动消息生产者
mqProducer.start();
List<Order> orderList = getOrderList();
for (int i = 0; i < orderList.size(); i++) {
String body = "【" + orderList.get(i) + "】订单状态变更消息";
// 创建消息,并指定Topic(主题),Tag(标签)和消息内容
Message msg = new Message("ORDER_STATUS_CHANGE", "", body.getBytes(RemotingHelper.DEFAULT_CHARSET));
// MessageQueueSelector: 消息队列选择器,根据业务唯一标识自定义队列选择算法
/**
* msg:消息对象
* selector:消息队列的选择器
* arg:选择队列的业务标识,如本例中的orderId
*/
SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {
/**
* @param mqs 队列集合
* @param msg 消息对象
* @param arg 业务标识的参数,对应send()方法传入的第三个参数arg
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//根据arg(实际上是订单id)选择消息发送的队列
long index = (Long) arg % mqs.size();
return mqs.get((int) index);
}
//mqProducer.send()方法第三个参数, 会传递到select()方法的arg参数
}, orderList.get(i).getOrderId());
System.out.println(String.format("消息发送状态:%s, orderId:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
orderList.get(i).getOrderId(),
sendResult.getMessageQueue().getQueueId(),
body));
}
// 如果不再发送消息,关闭Producer实例
mqProducer.shutdown();
}
/**
* 订单状态变更流程: ORDER_CREATE(订单创建) -> ORDER_PAYED(订单已支付) -> ORDER_COMPLETE(订单完成)
*/
public static List<Order> getOrderList() {
List<Order> orderList = new ArrayList<>();
Order orderDemo = new Order();
orderDemo.setOrderId(1L);
orderDemo.setOrderStatus("ORDER_CREATE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(2L);
orderDemo.setOrderStatus("ORDER_CREATE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(1L);
orderDemo.setOrderStatus("ORDER_PAYED");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(2L);
orderDemo.setOrderStatus("ORDER_PAYED");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(2L);
orderDemo.setOrderStatus("ORDER_COMPLETE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(3L);
orderDemo.setOrderStatus("ORDER_CREATE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(4L);
orderDemo.setOrderStatus("ORDER_CREATE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(3L);
orderDemo.setOrderStatus("ORDER_PAYED");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(1L);
orderDemo.setOrderStatus("ORDER_COMPLETE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(3L);
orderDemo.setOrderStatus("ORDER_COMPLETE");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(4L);
orderDemo.setOrderStatus("ORDER_PAYED");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(4L);
orderDemo.setOrderStatus("ORDER_COMPLETE");
orderList.add(orderDemo);
return orderList;
}
}
public class Order implements Serializable {
/**
* 订单ID
*/
private Long orderId;
/**
* 订单状态
*/
private String orderStatus;
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
public String getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", orderStatus='" + orderStatus + '\'' +
'}';
}
}
消费者消费消息
public class OrderMQConsumer {
public static void main(String[] args) throws MQClientException {
// 创建DefaultMQPushConsumer类并设定消费者名称
DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqPushConsumer.setNamesrvAddr("192.168.47.100:9876");
// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果不是第一次启动,那么按照上次消费的位置继续消费
mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*
mqPushConsumer.subscribe("ORDER_STATUS_CHANGE", "*");
// 注册回调实现类来处理从broker拉取回来的消息
// 注意:顺序消息注册的是MessageListenerOrderly监听器
mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext consumeOrderlyContext) {
consumeOrderlyContext.setAutoCommit(true);
for (MessageExt msg : msgList) {
// 每个queue有唯一的consume线程来消费, 订单对每个queue都是分区有序
System.out.println("消费线程=" + Thread.currentThread().getName() +
", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 标记该消息已经被成功消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者实例
mqPushConsumer.start();
}
}
延迟消息
什么是延迟消息
延迟消息顾名思义不是用户能立即消费到的,而是等待一段特定的时间才能收到。
举例如下场景比较适合使用延时消息:
订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。
在RocketMQ中使用延迟消息
不像其他延迟消息的实现,客户端可以自定义延迟时间,而RocketMQ则不支持任意时间的延迟,它提供了18个级别(延迟时间选择)。
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
延迟消息代码实现
/**
*
* 3、发送延时消息
* 生产者发送消息后,消费者在指定时间才能消费消息,这类消息
* 被称为延迟消息或定时消息。生产者发送延迟消息前需要设置延迟级
* 别,目前开源版本支持18个延迟级别:Broker在接收用户发送的消息
* 后,首先将消息保存到名为SCHEDULE_TOPIC的Topic中。此时,
* 消费者无法消费该延迟消息。然后,由Broker端的定时投递任务定时
* 投递给消费者。 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m
*/
public class ScheduleTest {
@Test
public void producer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 1、初始化生产者,配置生产者参数,启动生产者
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("192.168.47.100:9876");
producer.start();
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 10; j++) {
// 2、初始化消息对象 topic名字 tags: 消息过滤词
Message message = new Message("Schedule","Tags",(i+"_ScheduleProducer").getBytes(StandardCharsets.UTF_8));
//1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//message.setDelayTimeLevel(1);
//十秒之后发送
message.setDelayTimeMs(10000L);
producer.send(message);
System.out.println("消息发送成功_"+ LocalTime.now());
}
}
producer.shutdown();
}
@Test
public void consumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduleConsumer");
consumer.setNamesrvAddr("192.168.47.100:9876");
consumer.subscribe("Schedule","*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < list.size(); i++) {
System.out.println(i+"_消息消费成功_"+ LocalTime.now());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer started.%n");
}
}
单向消息
什么是单向消息
单向消息的生产者只管发送过程,不管发送结果。
单向消息应用场景:
主要用于日志传输等消息允许丢失的场景。
代码示例
package com.jjy.sendmessage;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
/**
* 5、发送单向消息
* 单向消息的生产者只管发送过程,不管发送结果。单项消息主要
* 用于日志传输等消息允许丢失的场景
*/
public class OnewayTest {
@Test
public void Producer() throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("192.168.47.100:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message message = new Message("Simple","Tags",(i+"_SyncProducer").getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
System.out.printf(i+"_消息发送成功%n");
}
producer.shutdown();
}
}
批量消息
什么是批量消息
Rocket MQ的批量消息可以提高消息的吞吐能力和处理效率,降低下游系统的API调用频率,是一种优化消息传输和处理的有效手段。
批量消息的特点
批量消息具有相同的topic
批量消息不支持延迟消息
批量消息的大小不超过4M(4.4版本之后要求不超过1M)
批量消息的使用场景
- 消息的吞吐能力和处理效率:通过将多条消息打包成一批进行发送,可以减少网络传输开销和消息处理的时间,从而提高整体的消息处理效率。
- 下游系统的API调用频率:通过将多条消息合并成一条批量消息进行发送,可以减少下游系统接收和处理消息的次数,从而降低API调用频率,减轻下游系统的负载压力。
代码示例
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("192.168.47.100:9876");
producer.start();
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
Message message = new Message("BatchTest","Tags",("OrderID" + i).getBytes(StandardCharsets.UTF_8));
messages.add(message);
}
producer.send(messages);
producer.shutdown();
生产者最佳实践总结
相对消费者而言,生产者的使用更加简单,一般主要关注消息类型、消息发送方法即可正常使用RocketMQ发送消息。
常用消息类型
消息类型 | 优点 | 缺点 | 备注 |
---|---|---|---|
普通消息 (并发消息) | 性能最好。单机 TPS 的级别为100000 | 消息的生产和消费都无序 | 大部分场景适用 |
分区有序消息 | 单分区中消息有序,单机发送 TPS万级别 | 单点问题。如果 Broker宕机则会导致发送失败 | 大部分有序消息场景适用 |
全局有序消息 | 类似传统的 Queue,全部消息有序,单机发送 TPS 千级别 | 单点问题。如果 Broker宕机则会导致发送失败 | 极少场景使用 |
延迟消息 | RocketMO 自身支持,不需要额使用组件,支持延迟特性 | 不能根据任意时间延迟,使用范围受限。Broker 随着延迟级别增大支持越多,CPU压力越大,延迟时间不准确 | 非精确、延迟级别不多的场景,非常方便使用 |
事务消息 | RocketMQ 自身支持,不需要额外使用组建支持事务特性 | RocketMO 事务是生产者事务,只有生产者参与,如果消费者处理失败则事务失效 | 简单事务处理可以使用 |
常用的发送方法
发送方法 | 优点 | 缺点 | 备注 |
---|---|---|---|
send(Message msg) 同步 | 最可靠 | 性能最低 | 适用于高可靠场景 |
send(Message msg,MessageQueue mq) | 可以发送顺序消息 | 单点故障后不可用 | 适用于顺序消息 |
send(Message msg,MessageQueueSelectorselector, Object arg) | 队列选择方法最灵活 | 比较低级的接口,使用有门槛 | 特殊场景 |
sendOneway(Message msg) | 使用最方便 | 消息有丢失风险 | 适用于对消息丢失不敏感的场景 |
send(Collection msgs) | 效率最高 | 发送失败后降级比较困难 | 适用于特殊场景 |
过滤消息
你去市场买菜,阿姨给你一个二维码付款,支持某信、某付宝、某银联App等。假如你被要求设计这个二维码的功能,在一个支付请求进入Topic后,如果你只需要处理某付宝的消息,你的代码得这么写:
boolean receiveMessage(Message msg)
{
String from = msg.getProperty("from");
if ( "某付宝".equals(from) ){
callApiXbao(msg);
}else{
log.error("请求异常" + JSON.toJsonString(msg));
}
}
由于阿姨的生意非常好,每天有上亿的支付订单,你的应用程序要处理某信、某付宝、某银联App全部的支付消息,这会导致消费效率低下,我们有什么办法,可以只消费某付宝的支付消息呢?
解决:
RocketMQ 设计了消息过滤,来解决大量无意义流量的传输:即对于客户端不需要的消息,Broker就不会传输给客户端,以免浪费宽带。
消息过滤分类
RocketMQ 版支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:
对比项 | Tag标签过滤 | SQL属性过滤 |
---|---|---|
过滤目标 | 消息的Tag标签。 | 消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。 |
过滤能力 | 精准匹配。 | SQL语法匹配。 |
适用场景 | 简单过滤场景、计算逻辑简单轻量。 | 复杂过滤场景、计算逻辑较复杂。 |
应用场景
在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只关注自身逻辑需要的消息子集。使用消息队列 RocketMQ 版的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。
过滤消息之Tag过滤
什么是Tag过滤
Tag 过滤是最简单的一种过滤方法,通常 Tag 可以用作消息的业务标识。可以设置 Tag 表达式,判断消息是否包含这个 Tag。
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息:
-
订单消息
-
支付消息
-
物流消息
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的下游系统所订阅: -
支付系统:只需订阅支付消息。
-
物流系统:只需订阅物流消息。
-
交易成功率分析系统:需订阅订单和支付消息。
-
实时计算系统:需要订阅所有和交易相关的消息。
过滤效果如下图所示:
代码示例
/**
* 过滤消息生产者-tag方式
* @throws MQClientException
* @throws MQBrokerException
* @throws RemotingException
* @throws InterruptedException
*/
@Test
public void tagproduce() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("FilterTagProducer");
producer.setNamesrvAddr("192.168.47.100:9876");
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC"};
for (String tag : tags) {
Message message = new Message("Filter",tag,(tag+"_FilterTagProducer").getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message);
}
producer.shutdown();
}
/**
* 过滤消息生产者-tag方式
* @throws MQClientException
*/
@Test
public void tagconsumer () throws MQClientException, InterruptedException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleConsumer");
consumer.setNamesrvAddr("192.168.47.100:9876");
consumer.subscribe("Filter","TagA || TagC");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < list.size(); i++) {
System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Thread.sleep(Integer.MAX_VALUE);
}
集群消费
什么是集群消费
同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。
代码示例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleConsumer");
consumer.setNamesrvAddr("192.168.47.100:9876");
consumer.subscribe("SimpleMessageTest","*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < list.size(); i++) {
System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Thread.sleep(Integer.MAX_VALUE);
广播消息
什么是广播消费
当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。
示例代码
@Test
public void consumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleConsumer");
consumer.setNamesrvAddr("192.168.47.100:9876");
consumer.subscribe("Simple","*");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < list.size(); i++) {
System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer started.%n");
}
可视化配置
下载地址
https://www.redisant.cn/rocketmq?spm=a2c6h.12873639.article-detail.7.b9ee4f500GjzZr
登录
实时查看您的 RocketMQ 健康指标
支持丰富的数据格式
RocketMQ Assistant 会自动识别并格式化不同的数据格式,包括Text、JSON、XML、YAML、HEX、MessagePack,以及各种整数、浮点类型。
快速查看与发布消息
可以从主题的最开始、指定时间戳或指定偏移处开始消费消息、过滤消息;发布消息时配合数据模板一次发送数千条消息进行性能测试,以了解系统如何处理负载
实时查看主题的消息情况
使用 RocketMQ Assistant,您可以快速查看并更新Topic配置;管理消费者组,重置偏移量,或者查看它们订阅的主题与分区。
查看消费者组
查看组内的每个消费者订阅的主题与队列,以及当前消费位置和延迟;支持跳过消息堆积、根据时间戳重置偏移量。
消息轨迹
根据消息ID或消息Key追踪消息,了解消息从生产、存储到消费的详细过程。
消息存储
关系型数据库
ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。
文件
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题。
性能对比
文件系统 > 关系型数据库DB
消息查询
在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。
//返回结果
SendResult [
sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0],
queueOffset=0]
- 按MessageId查询消息
Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址、偏移信息,并且会把Message Id作为结果的一部分返回。Message Id中属于精确匹配,代表唯一一条消息,查询效率更高。
- 按照Message Key查询消息
消息的key是开发人员在发送消息之前自行指定的,通常把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。
- 按照Unique Key查询消息
除了开发人员指定的消息key,生产者在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一代表一条消息。
消息在消息队列RocketMQ中存储的时间默认为3天(不建议修改),即只能查询从消息发送时间算起3天内的消息,三种查询方式的特点和对比如下表所述:
查询方式 | 查询条件 | 查询类别 | 说明 |
---|---|---|---|
按Message ID | Message ID | 精确查询 | 根据Message ID可以精确定位任意一条消息,获取消息的属性。 |
按Message Key | Topic+Message Key | 模糊查询 | 根据Topic和Message Key可以匹配到包含指定Key的最近64条消息。注意 建议消息生产方为每条消息设置尽可能唯一的Key,以确保相同的Key的消息不会超过64条,否则消息会漏查。 |
按Unique Key | Unique Key | 精确查询 | 与Message Id类似,匹配唯一一条消息,如果生产端重复发送消息,则有可能匹配多条,但它们的unique key是唯一的。 |
推荐查询过程
SpringBoot接入RocketMQ生产者
添加maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
添加生产者spring配置
rocketmq:
# nameserver地址
name-server: 192.168.47.100:9876
producer:
# 生产组
group: my-group1
# 发送消息超时时间
send-message-timeout: 300000
添加生产者代码
生产者rocketMQTemplate会根据配置的Namesrv地址自动生成一个bean注入spring容器中,我们在使用的时候直接添加@Resource 或者@Autowired 注解即可。当前的版本支持直接发送一个对象或字符串,RocketMQ 使用 JSON 作为序列化方式进行传输。
package com.jjy.produce;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class MessageProduce {
@Autowired
private RocketMQTemplate rocketMQTemplate; // 直接注入生产者
@Value("${demo.rocketmq.topic}")
private String topic;
/**
* 发送消息
* @param topic 主题
* @param message 消息
* @return
*/
public SendResult sendMessage( String message){
return rocketMQTemplate.syncSend(topic, message);
}
}
RocketMQ SpringBoot 3.0不兼容解决方案
报错信息
*************************** APPLICATION FAILED TO START ***************************
Description:
Field rocketMQTemplate in com.dayuwebtech.dayupay.common.rocketmq.impl.SendPayReNotifyMessageImpl required a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.
The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=true)
Action: Consider defining a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' in your configuration.
我们要在resources文件夹中,新建META-INF/spring文件夹,在里面新建一个叫org.springframework.boot.autoconfigure.AutoConfiguration.imports的文件里面填入 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
SpringBoot接入RocketMQ消费者
消费代码
消费者主要使用RocketMQMessageListener接口进行监听配置。
package com.jjy.consumer;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}",
consumerGroup = "${demo.rocketmq.consumer}",
// 过滤方式,默认为Tag过滤。
selectorType = SelectorType.TAG,
//消费模式,有顺序消费、并发消费。
consumeMode = ConsumeMode.ORDERLY,
// 消息模式,有集群消费、广播消费
messageModel = MessageModel.CLUSTERING,
// default″*:过滤值,默认 为全部消费,不过滤。
selectorExpression = "a"
)
public class MessagConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println(s);
}
}
如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力