文章目录
- 一、消息队列简介
- 1.1 什么是消息队列
- 1.2 常见消息队列对比
- 1.3 RockectMQ 核心概念
- 1.4 RockectMQ 工作机制 (★)
- 二、RocketMQ 部署相关
- 2.1 服务器单机部署
- 2.2 管控台页面
- 三、RocketMQ 的基本使用
- 3.1 入门案例
- 3.2 消息发送方式
- 3.2.1 同步消息
- 3.2.2 异步消息
- 3.2.3 一次性消息
- 3.3 消息消费方式
- 3.3.1 集群模式
- 3.3.2 广播模式
- 3.4 顺序消息
- 3.5 延迟消息
- 3.6 消息过滤
- 3.6.1 Tag 过滤
- 3.6.2 SQL92 过滤
- 四、SpringBoot 集成 RocketMQ
- 4.1 入门案例
- 4.2 消息发送方式
- 4.2.1 同步消息
- 4.2.2 异步消息
- 4.2.3 一次性消息
- 4.3 消息消费方式
- 4.3.1 集群模式
- 4.3.2 广播模式
- 4.4 顺序消息
- 4.5 延时消息
- 4.6 消息过滤
- 4.6.1 Tag 过滤
- 4.6.2 SQL92 过滤
一、消息队列简介
1.1 什么是消息队列
消息队列(MQ)也叫消息队列中间件,其主要通过消息的发送和接受来实现程序的异步解耦、削峰填谷以及数据分发,但是 MQ 真正的目的是为了通讯。他屏蔽了复杂的通讯协议,像常用的 dubbo、http 协议都是同步的。这两种协议很难实现双端通讯(即:A调用B,B也可以主动调用A),而且不支持长链接。MQ 做的就是在这些协议上构建一个简单协议——生产者、消费者模型,MQ 带给我们的不是底层的通讯协议,而是更高层次的通讯模型。他定义了两个对象:发送数据的叫做生产者,接受消息的叫做消费者,我们可以无视底层的通讯协议,并且可以自己定义生产者消费者。
参考:消息队列详解
1.2 常见消息队列对比
1.3 RockectMQ 核心概念
生产者 Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。
消费者 Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
名字服务 Name Server:名称服务充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个Namesrv 实例组成集群,但相互独立,没有信息交换。
代理服务器 Broker Server:消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
消息主题 Topic:表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。
消息队列 MessageQueue:对于每个 Topic 都可以设置一定数量的消息队列用来进行数据的读取。
消息内容 Message:消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。
标签 Tag:为消息设置的标志,用于同一主题 Topic 下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。
1.4 RockectMQ 工作机制 (★)
RockectMQ 有自己的注册中心,即 NameServer,连接命名服务后会拉取代理服务器的列表到本地缓存,生产者会通过负载均衡选出代理服务器的具体 IP,然后向选出的代理服务器发送消息,最终发送给消费者进行消费。如果发送的消息含有标签Tag,那么会在消费者消费时进行消息的过滤。
其中,每个 Topic 默认有 4 个 MessageQueue,即 4 个写和读队列。在消息中间件每个 topic 设置 4 个队列,主要是为了解决并发性能的问题。如果只有一个队列,为保证线程安全,必须得给队列进行写操作时上锁。设置 4 个队列也是由于大部分的服务器核心数都是 4 核的。
二、RocketMQ 部署相关
2.1 服务器单机部署
搜索资源:rocketmq-all-4.4.0-bin-release.zip
① 将压缩包上传服务器,把rocketmq-all-4.4.0-bin-release.zip
拷贝到 /usr/local/software
② 使用解压命令进行解压到 /usr/local
目录
unzip /usr/local/software/rocketmq-all-4.4.0-bin-release.zip -d /usr/local
③ 软件文件名重命名
mv /usr/local/rocketmq-all-4.4.0-bin-release/ /usr/local/rocketmq-4.4/
④ 设置环境变量
vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.8
export ROCKETMQ_HOME=/usr/local/rocketmq-4.4
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
修改环境变量后,需要 source /etc/profile
使配置文件生效。
⑤ 修改脚本中的 JVM 相关参数和启动参数的配置
vi /usr/local/rocketmq-4.4/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
vi /usr/local/rocketmq-4.4/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
⑥ 修改配置文件,设置 Nameserver 和 Broker-server 部署机器的IP地址。
vi /usr/local/rocketmq-4.4/conf/broker.conf
注:如果是服务器本身可以不设置
⑦ 启动 NameServer
# 1.启动NameServer,& 代表后台输出
nohup sh mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
⑧ 启动 Broker
#1.启动Broker
# nohup sh mqbroker -n 部署的IP地址:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &
nohup sh mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf &
#2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
⑨ 使用 jps
命令查看是否开启成功,如果看到NamesrvStartup
和BrokerStartup
这两个进程,则证明启动成功。
关闭 nameserver:sh mqshutdown namesrv
关闭 broker:sh mqshutdown broker
另外:服务器需要暂时关闭防火墙 systemctl stop firewalld
,并可使用 firewall-cmd --state
查看防火墙状态。
具体可参考:Linux关闭防火墙命令
2.2 管控台页面
搜索资源:rocketmq-console-ng-1.0.1.jar
在 jar 包的文件夹下新建一个配置文件 application.properties,编辑管控台的端口和 NameServer 中心的 IP 地址,使用 java -jar rocketmq-console-ng-1.0.1.jar
启动即可。
访问 http://localhost:9999/#/
管控台界面如下:
注:管控台要求 jdk1.8。
三、RocketMQ 的基本使用
在一个工程中创建两个模块,模拟生产者和消费者:
3.1 入门案例
添加 rocketmq 的 pom 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
生产者模块生产消息:
public class Producer {
public static void main(String[] args) throws Exception {
// 定义一个生产者对象
DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
// 连接nameSever
producer.setNamesrvAddr("192.168.63.101:9876");
// 启动生产者
producer.start();
// 设置消息发送的目的地
String topic = "helloTopic";
// 发送消息
for (int i = 0; i < 3; i++) {
Message message = new Message(topic, ("RocketMQ普通消息:" + i).getBytes(Charset.defaultCharset()));
// 发送完成之后会返回响应结果
SendResult result = producer.send(message);
System.out.println("发送状态:" + result.getSendStatus());
}
System.out.println("消息发送完毕");
// 关闭资源
producer.shutdown();
}
}
消费者模块消费消息:
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
// 设置nameSever地址
consumer.setNamesrvAddr("192.168.63.101:9876");
// 设置订阅的主题
consumer.subscribe("helloTopic", "*"); // * 消息不过滤
// 设置消费模式,默认集群
// consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置消息的监听器
consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
String content = new String(msg.getBody(), Charset.defaultCharset());
System.out.println("线程:" + Thread.currentThread() + "消息内容:" + content);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 通知MQ消费正常
// return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 通知MQ消费失败,消费者会重新消费
}
});
// 启动的消费者
consumer.start();
}
}
3.2 消息发送方式
3.2.1 同步消息
可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。上面演示的案例,就是同步消息发送方式。
应用程序给消息中间件发送消息的时候,需要等待消息中间件将消息存储完毕后才响应回去,业务代码才能往下执行。
发送方式:SendResult result = producer.send(msg);
上面演示的案例就是同步发送。
3.2.2 异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。
应用程序发送消息,消息中间件收到这个消息之后,直接给应用程序响应(此时消息并没有完全存储到磁盘),消息中间件继续存储消息,通过回调地址通知有应用程序存储的结果(成功或失败)。
发送方式:
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
...
}
@Override
public void onException(Throwable throwable) {
...
}
});
public class AsynchronousProducer {
public static void main(String[] args) throws Exception {
// 定义一个生产者对象
DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
// 连接nameSever
producer.setNamesrvAddr("192.168.63.100:9876");
// 启动生产者
producer.start();
// 设置消息发送的目的地
String topic = "helloTopic";
// 发送消息
Message message = new Message(topic, ("RocketMQ异步消息").getBytes(Charset.defaultCharset()));
System.out.println("消息发送前");
// 异步发送,需要传递异步回调消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息存储状态:" + sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送出现异常");
}
});
System.out.println("消息发送完毕");
TimeUnit.SECONDS.sleep(5); // 为了等回调消息,模拟程序睡眠5s后关闭资源
// 关闭资源
producer.shutdown();
}
}
3.2.3 一次性消息
一次性消息主要用在不特别关心发送结果的场景,例如:日志发送。
发送方式:producer.sendOneway(message);
应用程序给消息中间件发送消息的时候,不需要知道消息是否在消息中间存储了,只管发就是了。
public class OneTimeProducer {
public static void main(String[] args) throws Exception {
// 定义一个生产者对象
DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
// 连接nameSever
producer.setNamesrvAddr("192.168.63.100:9876");
// 启动生产者
producer.start();
// 设置消息发送的目的地
String topic = "helloTopic";
// 发送消息
Message message = new Message(topic, ("RocketMQ一次性消息").getBytes(Charset.defaultCharset()));
System.out.println("消息发送前");
// 一次性消息
producer.sendOneway(message);
System.out.println("消息发送完毕");
// 关闭资源
producer.shutdown();
}
}
3.3 消息消费方式
3.3.1 集群模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。
由于内部会根据 index++ % queue.size()
的方式来决定消息进哪个 messageQueue,因此当多个机器做集群的时候,也可能会发生消息消费分配不均等情况。如下面 topic 中一共有 10 个消息:
入门案例默认就是集群的消费方式:
3.3.2 广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。
设置消费模式:consumer.setMessageModel(MessageModel.BROADCASTING);
3.4 顺序消息
从上文的消费结果来看,在集群状态下,消息的消费顺序是乱序的,但有些场景是要求消息的消费是有序的,这要怎么实现呢?我们考虑以下两个场景:
① 如果在消费者做集群的情况下,由于消息会分散在不同的队列中,因此消息不可保证顺序消费,如:第四个消息比第一个消息更早被消费。因此,可以考虑将消息全放在一个队列中。
注:一个队列只会被一个消费者实例消费,一个消费者实例可以消费多个队列。
② 我们设置消费者的监听模式的时候使用的是 MessageListenerConcurrently 即多线程并发消费的形式,那么当消息全存储在一个队列时,由于 CPU 执行权等问题,消费者实例中多线程会并发的进行消费,也不会保证顺序消费。
// 一个队列对应一个实例的多个线程
consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
...
}
});
因此,可以使用 MessageListenerOrderly 让一个队列只对应一个线程。
// 从什么地方开始消费,队头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 一个队列对应一个实例的一个线程
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {
...
}
});
总结:如果想要实现顺序消费,在生产者的角度将消息存储在一个队列中,在消费者的角度就是将消息对应消费者实例里的一个线程。
顺序消费案例,生产者代码如下:
public class Producer {
public static void main(String[] args) throws Exception {
// 定义一个生产者对象
DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
// 连接nameSever
producer.setNamesrvAddr("192.168.63.100:9876");
// 启动生产者
producer.start();
// 设置消息发送的目的地
String topic = "orderTopic";
List<OrderStep> orderSteps = OrderUtil.buildOrders();
// 设置队列的选择器
// 将需要顺序消费的消息存储到同一个队列中
MessageQueueSelector selector = new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
System.out.println("队列的个数:" + list.size()); // 队列个数:4
Long orderId = (Long)o; // 传入的参数
int index = (int)(orderId % list.size());
return list.get(index);
}
};
// 发送消息
for (OrderStep orderStep : orderSteps) {
Message msg = new Message(topic, orderStep.toString().getBytes(Charset.defaultCharset()));
// 指定消息选择器,传入的参数
producer.send(msg, selector, orderStep.getOrderId()); // 将订单号传入选择器
}
System.out.println("消息发送完毕");
// 关闭资源
producer.shutdown();
}
}
消费者代码如下:
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyProducerGroup");
// 设置nameSever地址
consumer.setNamesrvAddr("192.168.63.101:9876");
// 设置订阅的主题
consumer.subscribe("orderTopic", "*");
// 从什么地方开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 一个队列对应的一个线程
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,
ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : list) {
System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",队列ID:" + msg.getQueueId() + ",消息内容:"
+ new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动的消费者
consumer.start();
}
}
模拟数据:
@Setter
@Getter
public class OrderStep {
private long orderId;
private String desc;
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
public class OrderUtil {
/**
* 生成模拟订单数据
*/
public static List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(101L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(102L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(101L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(103L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(102L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(103L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(102L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(101L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(103L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(101L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
控制台打印:
从控制台打印效果来看,102 订单全部存储在 ID 为 2 的队列当中,并且实现了顺序消费。
3.5 延迟消息
延时消息是 RocketMQ 延时发送给消费者消费的消息,典型应用场景如:订单超时未支付等。其不支持任意时间的延时,需要设置几个固定的延时等级,从 1s 到 2h 分别对应着等级 1 到 18。
等级 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
延时 | 1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 定义一个生产者对象
DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
// 连接nameSever
producer.setNamesrvAddr("192.168.63.101:9876");
// 启动生产者
producer.start();
// 设置消息发送的目的地
String topic = "helloTopic";
// 发送消息
SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Message message =
new Message(topic, ("延时消息,发送时间:" + cusFormat.format(new Date())).getBytes(Charset.defaultCharset()));
// 设置消息延时级别
message.setDelayTimeLevel(3);
// 发送完成之后会返回响应结果
SendResult result = producer.send(message);
System.out.println("消息发送完毕");
// 关闭资源
producer.shutdown();
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delayConsumerGroup");
// 设置nameSever地址
consumer.setNamesrvAddr("192.168.63.101:9876");
// 设置订阅的主题
consumer.subscribe("helloTopic", "*");
// 设置消息的监听器
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (MessageExt msg : list) {
System.out.println("消费时间:" + cusFormat.format(new Date()) + ",消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动的消费者
consumer.start();
}
}
控制台打印:
3.6 消息过滤
3.6.1 Tag 过滤
RocketMQ 的消息标签(Message Tag)是一种简单的路由机制,允许消费者根据标签来过滤并只消费感兴趣的消息。要实现消息标签的过滤,需要在发送消息时设置标签,并在消费者端配置标签过滤器。以下示例展示如何使用的标签过滤功能:
生产者设置消息标签并发送消息:
public class Producer {
public static void main(String[] args) throws Exception {
// 定义一个生产者对象
DefaultMQProducer producer = new DefaultMQProducer("tagProducesGroup");
// 连接nameSever
producer.setNamesrvAddr("192.168.63.101:9876");
// 启动生产者
producer.start();
// 设置消息发送的目的地
String topic = "tagFilterTopic";
// 发送消息
Message message1 = new Message(topic, "TagA", ("消息A").getBytes(Charset.defaultCharset()));
Message message2 = new Message(topic, "TagB", ("消息B").getBytes(Charset.defaultCharset()));
Message message3 = new Message(topic, "TagC", ("消息C").getBytes(Charset.defaultCharset()));
producer.sendOneway(message1);
producer.sendOneway(message2);
producer.sendOneway(message3);
System.out.println("消息发送完毕");
// 关闭资源
producer.shutdown();
}
}
消费者端配置标签过滤器:
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");
// 设置nameSever地址
consumer.setNamesrvAddr("192.168.63.101:9876");
// 设置订阅的主题
consumer.subscribe("tagFilterTopic", "TagA || TagC"); // 只消费 TagA 和 TagC
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println("消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动的消费者
consumer.start();
}
}
控制台打印:
3.6.2 SQL92 过滤
RocketMQ 的 SQL92 过滤器是一种基于消息属性的条件筛选机制,允许消费者只接收满足特定条件的消息。要使用 SQL92 过滤器,需要在消费者端设置过滤条件。以下是示例,演示如何在消费者端设置 SQL92 过滤器:
生产者设添加属性 putUserProperty(key, value)
public class Producer {
public static void main(String[] args) throws Exception {
// 定义一个生产者对象
DefaultMQProducer producer = new DefaultMQProducer("sqlProducesGroup");
// 连接nameSever
producer.setNamesrvAddr("192.168.63.100:9876");
// 启动生产者
producer.start();
// 设置消息发送的目的地
String topic = "sqlFilterTopic";
// 发送消息
Message message1 = new Message(topic, "TagA", ("消息A").getBytes(Charset.defaultCharset()));
message1.putUserProperty("age", "22");
message1.putUserProperty("weight", "45");
Message message2 = new Message(topic, "TagB", ("消息B").getBytes(Charset.defaultCharset()));
message2.putUserProperty("age", "30");
message2.putUserProperty("weight", "50");
Message message3 = new Message(topic, "TagC", ("消息C").getBytes(Charset.defaultCharset()));
message3.putUserProperty("age", "15");
message3.putUserProperty("weight", "48");
producer.sendOneway(message1);
producer.sendOneway(message2);
producer.sendOneway(message3);
System.out.println("消息发送完毕");
// 关闭资源
producer.shutdown();
}
}
消费者设置过滤条件:
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 定义消息消费者(在同一个JVM中,消费者的组名不能重复)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");
// 设置nameSever地址
consumer.setNamesrvAddr("192.168.63.101:9876");
// 设置订阅的主题
consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age<25 and weight<47"));
// 一个队列对应的一个线程
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println("消息内容:" + new String(msg.getBody(), Charset.defaultCharset()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动的消费者
consumer.start();
}
}
注意:
① 过滤条件支持以下形式:
数值比较,如:>,>=,<,<=,BETWEEN,=
字符比较,如:=,<>,IN
IS NULL 或者 IS NOT NULL
逻辑符号 AND,OR,NOT
常量支持类型为:
数值,如:**123,3.1415;
字符,如:‘abc’,必须用单引号包裹起来
NULL,特殊的常量
布尔值,TRUE 或 FALSE
② 在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf,添加参数enablePropertyFilter=true,重启 broker 代理服务器。
vi /usr/local/rocketmq-4.4/conf/broker.conf
四、SpringBoot 集成 RocketMQ
4.1 入门案例
1、添加 pom 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
2、生产者模块添加配置
rocketmq:
name-server: 192.168.63.101:9876
producer:
group: my-group
3、消费者模块添加配置
rocketmq:
name-server: 192.168.63.101:9876
4、生产者模块生产消息
@SpringBootTest
public class RocketMQTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void sendMsg() {
Message<String> msg = MessageBuilder.withPayload("发送消息").build();
rocketMQTemplate.send("helloTopicBoot", msg);
}
}
注意:① Message 对象是 Spring 框架提供的对象 import org.springframework.messaging.Message;
② rocketMQTemplate.send(destination, message)
方法是同步的发送方式。
5、消费者模块消费消息
@Component
// 消费者名字叫 helloConsumerGroup,消费的生产组叫 helloTopicBoot
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot", messageModel = MessageModel.BROADCASTING)
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到消息:" + new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
4.2 消息发送方式
下文不阐述具体的发送细节,细节参考上文。
4.2.1 同步消息
@Test
public void sendSYNMsg() throws InterruptedException {
Message<String> msg = MessageBuilder.withPayload("发送同步消息").build();
rocketMQTemplate.syncSend("helloTopicBoot", msg);
}
4.2.2 异步消息
@Test
public void sendASYNMsg() throws InterruptedException {
Message<String> msg = MessageBuilder.withPayload("发送异步消息").build();
rocketMQTemplate.asyncSend("helloTopicBoot", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送状态:" + sendResult.getSendStatus());
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送失败");
}
});
TimeUnit.SECONDS.sleep(5);
}
4.2.3 一次性消息
@Test
public void sendOnewayMsg() {
Message<String> msg = MessageBuilder.withPayload("发送一次性消息").build();
rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
}
4.3 消息消费方式
这里模拟生产者发送一次性消息10次:
@Test
public void sendOnewayMsgLoop() {
for (int i = 0; i < 10; i++) {
Message<String> msg = MessageBuilder.withPayload("发送一次性消息:" + i).build();
rocketMQTemplate.sendOneWay("helloTopicBoot", msg);
}
}
4.3.1 集群模式
默认情况下,消费者采用负载均衡方式消费消息,即采用集群模式,也可以在 @RocketMQMessageListener
注解中设置 messageModel
属性来改变消费模式。
@Component
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot",
messageModel = MessageModel.CLUSTERING) // 设置消费模式
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("收到消息:" + new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
控制台打印:
4.3.2 广播模式
设置广播模式:messageModel = MessageModel.BROADCASTING
@Component
@RocketMQMessageListener(consumerGroup = "helloConsumerGroup", topic = "helloTopicBoot",
messageModel = MessageModel.BROADCASTING) // 设置消费模式
public class HelloTopicBootListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
...
}
}
控制台打印:
4.4 顺序消息
生产者设置队列选择器,需要将顺序消息放在同一个队列:
@Test
public void sendOrderlyMsg() {
// 设置队列的选择器
// 将需要顺序消费的消息存储到同一个队列中
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) {
String orderIdStr = (String)o; // 传入参数
Long orderId = Long.valueOf(orderIdStr);
int index = (int)(orderId % list.size());
return list.get(index);
}
});
List<OrderStep> orderSteps = OrderUtil.buildOrders();
// 发送消息
for (OrderStep step : orderSteps) {
Message<String> msg = MessageBuilder.withPayload(step.toString()).build();
rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot", msg, String.valueOf(step.getOrderId()));
}
}
消费者默认一个队列是线程并发消费,可以通过设置 consumeMode = ConsumeMode.ORDERLY
,将一个消息队列对应消费者的一个线程,以实现顺序消费:
@Component
@RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot", topic = "orderlyTopicBoot",
consumeMode = ConsumeMode.ORDERLY) // 设置一个队列对应一个线程
public class OrderlyTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("当前线程:" + Thread.currentThread().getId() + ",队列ID:" + messageExt.getQueueId() + ",消息内容:"
+ new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
控制台打印:
4.5 延时消息
@Test
public void sendDelayMsg() {
SimpleDateFormat cusFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Message<String> msg = MessageBuilder.withPayload("发送延时消息,发送时间:" + cusFormat.format(new Date())).build();
// 设置延时等级3,这个消息将在 10s 之后发送(详看delayTimeLevel),消息在任务队列里存储,10s 后发送
// 3000 代表同步等待 3s,若超过 3s 消息队列都没有响应,自动断开链接
rocketMQTemplate.syncSend("helloTopicBoot", msg, 3000, 3);
}
4.6 消息过滤
4.6.1 Tag 过滤
生产者生产消息,Topic 和 Tag 以 “:” 分割( “:” 前后不能有空格)
@Test
public void sendTagFilterMsg() {
Message<String> msg1 = MessageBuilder.withPayload("消息A").build();
rocketMQTemplate.send("tagFilterBoot:TagA", msg1);
Message<String> msg2 = MessageBuilder.withPayload("消息B").build();
rocketMQTemplate.send("tagFilterBoot:TagB", msg2);
Message<String> msg3 = MessageBuilder.withPayload("消息C").build();
rocketMQTemplate.send("tagFilterBoot:TagC", msg3);
}
消费者设置过滤条件:
@Component
@RocketMQMessageListener(consumerGroup = "tagFilterConsumerBoot", topic = "tagFilterBoot",
selectorExpression = "TagA || TagC") // selectorExpression 过滤条件
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("消息内容:" + new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
控制台打印:
4.6.2 SQL92 过滤
使用 setHeader(String headerName, Object headerValue)
方法设置过滤条件:
@Test
public void sendSQL92FilterMsg() {
Message<String> msg1 = MessageBuilder.withPayload("美女A").setHeader("age", 22).setHeader("weight", 90).build();
rocketMQTemplate.send("SQL92FilterBoot", msg1);
Message<String> msg2 = MessageBuilder.withPayload("美女B").setHeader("age", 20).setHeader("weight", 100).build();
rocketMQTemplate.send("SQL92FilterBoot", msg2);
Message<String> msg3 = MessageBuilder.withPayload("美女C").setHeader("age", 25).setHeader("weight", 120).build();
rocketMQTemplate.send("SQL92FilterBoot", msg3);
}
消费者设置 selectorType
与 selectorExpression
:
@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterConsumerBoot", topic = "SQL92FilterBoot",
selectorType = SelectorType.SQL92, selectorExpression = "age<25 and weight>90") // 设置
public class Sql92FilterTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println("消息内容:" + new String(messageExt.getBody(), Charset.defaultCharset()));
}
}
控制台打印:
注:在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf,具体参考 3.6.2
文章参考:Java微服务商城高并发秒杀项目实战|Spring Cloud Alibaba真实项目实战+商城双11秒杀+高并发+消息+支付+分布式事物Seata