目录
RocketMQ是什么?
RocketMQ的主要特点包括
RocketMQ 的消息模型
队列模型/点对点
发布/订阅模型
RocketMQ 的消息模型
RocketMQ中重要的组件
RocketMQ消费模式
RoctetMQ基本架构
springboot实战+讲解
基本样例
单向发送
异步发送
同步发送
推模式
拉模式-随机获取一个queue
拉模式-指定获取一个queue
顺序消息
顺序消息生产者
顺序消息消费者
广播消息
广播消息消费模式
延迟消息
预定日程生产者
预定日程消费者
批量消息
批量消息
分批发送批量消息
过滤消息
过滤消息-tag过滤消费者
过滤消息-SQL过滤生产者
过滤消息-SQL过滤消费者
事务消息
事务消息生产者
本地事务监听器
RocketMQ使用中常见的问题
RocketMQ如何保证消息顺序
RocketMQ的事务消息原理
RocketMQ是什么?
RocketMQ是一种开源的分布式消息中间件系统,最初由阿里巴巴开发并开源。它是一种快速、可靠、可伸缩的消息队列系统,旨在解决分布式系统中的异步通信和数据传输问题。
RocketMQ的主要特点包括
1. 高吞吐量:RocketMQ具有出色的性能表现,能够处理大规模消息的传输和处理。它通过支持并行化和异步传输等机制来实现高吞吐量。
2. 可靠性:RocketMQ在消息传输过程中提供了可靠性保证。它采用了主从复制的方式来确保消息的高可用性,并提供了严格的消息顺序传输保证。
3. 分布式支持:RocketMQ是为分布式系统设计的,它可以在多个节点上进行部署,并支持横向扩展。它提供了自动的负载均衡和故障转移机制,以实现高可用性和可伸缩性。
4. 灵活的消息模型:RocketMQ支持多种消息模型,包括发布/订阅模型和点对点模型。这使得开发人员可以根据实际需求选择适合的消息传输方式。
5. 实时性:RocketMQ具有低延迟和高实时性的特点,适用于需要快速响应的场景,如实时数据分析、流式处理等。
6. 可管理性:RocketMQ提供了可视化的管理工具和监控指标,方便管理员对消息队列进行配置、监控和管理。
7. 丰富的特性:RocketMQ还提供了许多其他功能,如事务消息、延迟消息、批量消息发送、消息过滤等,以满足不同场景下的需求。
总的来说,RocketMQ是一个功能强大的消息中间件系统,具备高吞吐量、可靠性、分布式支持和灵活的消息模型等特点,适用于构建可靠的分布式系统和实时数据处理系统。
RocketMQ 的消息模型
RocketMQ的消息模型是基于发布/订阅模式和点对点模式的混合模型。
消息队列有两种消息模型,分别是队列模型(点对点)和发布/订阅模型。
队列模型/点对点
队列模型是最开始的一种消息队列模型,对应着消息队列“发-存-收”的模型。生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,也就是说每条消息只能被一个消费者消费。
发布/订阅模型
如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。于是,发布/订阅模型就诞生了。
在发布-订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。
RocketMQ 的消息模型
RocketMQ中重要的组件
- Producer(生产者):Producer负责将消息发送到RocketMQ的Broker,它将消息封装成消息对象并指定消息的主题(Topic)。生产者可以根据需要选择同步发送或异步发送消息。
- Consumer(消费者):Consumer从RocketMQ的Broker订阅消息,并对消息进行消费处理。消费者可以按照订阅的主题(Topic)和标签(Tag)来过滤需要的消息。
- Broker(消息代理):Broker是RocketMQ的核心组件,负责存储和转发消息。Broker接收生产者发送的消息,并将其存储在内部的存储引擎中,等待消费者订阅并消费。Broker还负责处理消费者的消费进度和消息的复制和同步。
- Name Server(命名服务器):Name Server是RocketMQ集群的管理节点,它负责管理和维护整个RocketMQ集群的元数据信息。Producer和Consumer通过Name Server来发现Broker的位置和状态信息。
- Topic(主题):主题是消息的逻辑分类,Producer将消息发送到特定的主题,而Consumer可以订阅并消费特定的主题。一个主题可以有多个消息队列,每个消息队列在一个Broker上。
- Message Queue(消息队列):消息队列是RocketMQ中消息的存储和传输单位。每个主题可以被分为多个消息队列,每个消息队列在一个Broker上,可以并行地处理消息。
- Consumer Group(消费者组):消费者组是一组具有相同消费逻辑的Consumer实例的集合。在一个消费者组中,每个消息队列只能由一个Consumer实例进行消费,但一个Consumer组可以有多个Consumer实例,从而实现负载均衡和高可用性。
- Offset:在Topic的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要RocketMQ为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。也可以这么说,
Queue
是一个长度无限的数组,Offset 就是下标。
RocketMQ消费模式
消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。
默认情况下就是集群消费,这种模式下一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费
,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
而广播消费消息会发给消费者组中的每一个消费者进行消费。
RoctetMQ基本架构
RocketMQ 一共有四个部分组成:NameServer,Broker,Producer 生产者,Consumer 消费者,它们对应了:发现、发、存、收,为了保证高可用,一般每一部分都是集群部署的。
springboot实战+讲解
实战之前,我们需要先搭建一个基于Maven的springboot项目,只需要加入以下依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.6</version>
</dependency>
基本样例
消息生产者分别通过三种方式发送消息:
- 同步发送:等待消息返回后再继续进行下面的操作。
- 异步发送:不等待消息返回直接进入后续流程。broker将结果返回后调用callback函数,并使用CountDownLatch计数。
- 单向发送:只负责发送,不管消息是否发送成功。
单向发送
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 单向发送
*/
public class OnewayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("Simple","TagA", "Simple-Oneway".getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
System.out.printf("%d 消息发送完成 %n" , i);
}
Thread.sleep(5000);
producer.shutdown();
}
}
异步发送
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 异步发送
*/
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
CountDownLatch countDownLatch = new CountDownLatch(100);//计数
for (int i = 0; i < 10; i++) {
Message message = new Message("Simple", "TagA", "Simple-Async".getBytes(StandardCharsets.UTF_8));
final int index = i;
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%d 消息发送成功%s%n", index, sendResult);
}
@Override
public void onException(Throwable throwable) {
countDownLatch.countDown();
System.out.printf("%d 消息失败%s%n", index, throwable);
throwable.printStackTrace();
}
}
);
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
同步发送
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 同步发送
*/
public class SyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("Simple", //主题
"TagA", //设置消息Tag,用于消费端根据指定Tag过滤消息。
"Simple-Sync".getBytes(StandardCharsets.UTF_8) //消息体。
);
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}
消费者消费消息分两种:
- 拉模式:消费者主动去Broker上拉取消息。
- 推模式:消费者等待Broker把消息推送过来。
推模式
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 推模式
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.subscribe("Simple","*");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("收到消息: %s%n" , n);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("Consumer Started.%n");
}
}
拉模式-随机获取一个queue
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 拉模式-随机获取一个queue
*/
public class PullLiteConsumer {
public static void main(String[] args) throws MQClientException {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("SimpleLitePullConsumer");
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe("Simple","");
litePullConsumer.start();
while (true) {
List<MessageExt> poll = litePullConsumer.poll();
System.out.printf("消息拉取成功 %s%n" , poll);
}
}
}
拉模式-指定获取一个queue
public class PullLiteConsumerAssign {
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("SimpleLitePullConsumer");
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.start();
Collection<MessageQueue> messageQueues = litePullConsumer.fetchMessageQueues("Simple");
List<MessageQueue> list = new ArrayList<>(messageQueues);
litePullConsumer.assign(list);
litePullConsumer.seek(list.get(0), 10);
try {
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
顺序消息
顺序消息指生产者局部有序发送到一个queue,但多个queue之间是全局无序的。
- 顺序消息生产者样例:通过MessageQueueSelector将消息有序发送到同一个queue中。
- 顺序消息消费者样例:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取(通过加锁的方式实现)。
顺序消息生产者
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* 顺序消息生产者
* 通过MessageQueueSelector将消息有序发送到同一个queue中。
*/
public class OrderProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 10; i++) {
Message message = new Message("OrderTopic", "TagA",
("order_" + j + "_step_" + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
int index = id % list.size();
return list.get(index);
}
}, j,30000);
System.out.printf("%s%n", sendResult);
}
}
producer.shutdown();
}
}
顺序消息消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 顺序消息消费者
* 通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取(通过加锁的方式实现)。
*/
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("OrderTopic","*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
list.forEach(n->{
System.out.println("QueueId:"+n.getQueueId() + "收到消息内容 "+new String(n.getBody()));
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
广播消息
广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。
- MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
- MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。
广播消息消费模式
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 广播消息消费模式
* 广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。
* ● MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
* ● MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。
*/
public class BroadcastConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadCastConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Simple","*");
consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式
// consumer.setMessageModel(MessageModel.CLUSTERING);//集群模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(n->{
System.out.println("QueueId:"+n.getQueueId() + "收到消息内容 "+new String(n.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
延迟消息
延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。
- message.setDelayTimeLevel(3):预定日常定时发送。1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;可以在dashboard中broker配置查看。
- msg.setDelayTimeMs(10L):指定时间定时发送。默认支持最大延迟时间为3天,可以根据broker配置:timerMaxDelaySec修改。(5.0才有,这里不做展示了)
预定日程生产者
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.time.LocalTime;
/**
* 预定日程定时发送
*/
public class ScheduleProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("Schedule", //主题
"TagA", //设置消息Tag,用于消费端根据指定Tag过滤消息。
"ScheduleProducer".getBytes(StandardCharsets.UTF_8) //消息体。
);
//1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);
producer.send(msg,30000);
System.out.printf(i + ".发送消息成功:%s%n", LocalTime.now());
}
producer.shutdown();
}
}
预定日程消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
/**
* 预定日程消费者
*/
public class ScheduleConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.subscribe("Schedule","*");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("接收时间:%s %n", LocalTime.now());
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("Simple Consumer Started.%n");
}
}
批量消息
批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
批量消息的使用限制:
- 消息大小不能超过4M,虽然源码注释不能超1M,但是实际使用不超过4M即可。平衡整体的性能,建议保持1M左右。
- 相同的Topic,
- 相同的waitStoreMsgOK
- 不能是延迟消息、事务消息等
批量消息
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
/**
* 批量发送消息
*/
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
ArrayList<Message> list = new ArrayList<>();
list.add(new Message("simple","TagA", "BatchProducer0".getBytes(StandardCharsets.UTF_8)));
list.add(new Message("simple","TagA", "BatchProducer1".getBytes(StandardCharsets.UTF_8)));
list.add(new Message("simple","TagA", "BatchProducer2".getBytes(StandardCharsets.UTF_8)));
SendResult send = producer.send(list);
System.out.printf(".发送消息成功:%s%n", send);
producer.shutdown();
}
}
分批发送批量消息
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* 分批批量发送消息
* 注意修改SIZE_LIMIT为 = 10 * 1000,不然发送消息时会提示消息体积过大
*/
public class SplitBatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SplitBatchProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
ArrayList<Message> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
list.add(new Message("simple","TagA", ("SplitBatchProducer"+i).getBytes(StandardCharsets.UTF_8)));
}
ListSplitter splitter = new ListSplitter(list);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
SendResult sendResult = producer.send(listItem,30000);
System.out.printf(".发送消息成功:%s%n", sendResult);
}
producer.shutdown();
}
}
class ListSplitter implements Iterator<List<Message>> {
private static final int SIZE_LIMIT = 10 * 1000; // 每个消息批次的最大大小
private final List<Message> messages; // 待发送的消息列表
private int currIndex; // 当前拆分到的位置
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20;
// 如果超过了单个批次所允许的大小,就将此消息之前的消息作为下一个子列表返回
if (tmpSize > SIZE_LIMIT) {
// 如果是第一条消息就超出大小限制,就跳过这条消息再继续扫描
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
// 如果当前子列表大小已经超出所允许的单个批次大小,那么就暂停添加消息
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
// 返回从currIndex到nextIndex之间的所有消息
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
过滤消息
使用Tag方式过滤(通过consumer.subscribe("TagFilterTest", "TagA || TagC")实现):
在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 过滤消息-tag过滤生产者
*/
public class TagFilterProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("FilterTopic", //主题
tags[i % tags.length], //设置消息Tag,用于消费端根据指定Tag过滤消息。
("TagFilterProducer_"+tags[i % tags.length]).getBytes(StandardCharsets.UTF_8) //消息体。
);
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}
过滤消息-tag过滤消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 过滤消息-tag过滤消费者
*/
public class TagFilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.subscribe("FilterTopic","TagA || TagC");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("收到消息: %s%n" , new String(n.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("TagFilter Consumer Started.%n");
}
}
使用Sql方式过滤(通过MessageSelector.bySql(String sql)参数实现):
Tag是RocketMQ中特有的一个消息属性。
RocketMQ的最佳实践中就建议使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用Tag来区分。
Tag方式有一个很大的限制,就是一个消息只能有一个Tag,这在一些比较复杂的场景就有点不足了。 这时候可以使用SQL表达式来对消息进行过滤。
过滤消息-SQL过滤生产者
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 过滤消息-SQL过滤生产者
*/
public class SqlFilterProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("FilterTopic", //主题
tags[i % tags.length], //设置消息Tag,用于消费端根据指定Tag过滤消息。
("TagFilterProducer_"+tags[i % tags.length] + "_i_" + i).getBytes(StandardCharsets.UTF_8) //消息体。
);
msg.putUserProperty("lasse", String.valueOf(i));
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}
过滤消息-SQL过滤消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 过滤消息-SQL过滤消费者
*/
public class SqlFilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.subscribe("FilterTopic", MessageSelector.bySql("(TAGS is not null And TAGS IN ('TagA','TagC'))"
+ "and (lasse is not null and lasse between 0 and 3)"));
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("收到消息: %s%n" , new String(n.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("SqlFilter Consumer Started.%n");
}
}
SQL92语法:
RocketMQ只定义了一些基本语法来支持这个特性。我们可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL ,IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
使用注意:
- 只有推模式的消费者可以使用SQL过滤。拉模式是用不了的;
- 另外消息过滤是在Broker端进行的,提升网络传输性能,但是broker服务会比较繁忙。(consumer将过滤条件推送给broker端)
事务消息
事务消息生产者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
/**
* 事务消息生产者
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("TransProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
//使用executorService异步提交事务状态,从而提高系统的性能和可靠性
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
//本地事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();//启动消息生产者同时半事务发送1.开启事务
try {
Message message = new Message("TransactionTopic", null,
("A向B系统转100块钱").getBytes(StandardCharsets.UTF_8));
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
System.out.printf("%s%n", result.getSendStatus());//半事务消息是否成功
} catch (Exception e) {
//回滚操作
}
Thread.sleep(100000);//等待broker端回调
producer.shutdown();
}
}
本地事务监听器
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 本地事务监听器
*/
public class TransactionListenerImpl implements TransactionListener {
@Override
/**
* 在提交完事务消息后执行。
* 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
* 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
* 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
*/
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//一端代码疯狂操作
switch (0) {
case 0:
//情况一:本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
case 1:
//情况二:本地事务失败
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
//情况三:业务复杂,中间过程或者依赖其他操作的返回结果,丢进事务回查(checkLocalTransaction方法↓)
return LocalTransactionState.UNKNOW;
}
}
//事务回查,默认是60s检查一次
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//打印每次回查时间
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sdf.format(new Date()));
//一端代码疯狂操作
switch (0) {
case 0:
//情况一:本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
case 1:
//情况二:本地事务失败
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
//情况三:业务复杂,中间过程或者依赖其他操作的返回结果,丢进事务回查(checkLocalTransaction方法↓)
return LocalTransactionState.UNKNOW;
}
}
}
事务消息消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 消费者使用的尽最大可能性确保成功消费(重试机制+死信队列特殊处理),所以B系统的处理比较简单,开启事务确保消费成功即可。
* @author Lasse
*/
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TransactionTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try{
//开启事务
for (MessageExt msg : msgs) {
//执行本地事务
System.out.println("一顿操作幂等性代码");
//本地事务成功
System.out.println("commit"+msg.getTransactionId());
}
}catch (Exception e){
e.printStackTrace();
System.out.println("本地事务失败,重试消费");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
}
RocketMQ使用中常见的问题
我们将消息流程分为三大部分,每一部分都有可能会丢失数据。
- 生产阶段:Producer通过网络将消息发送给Broker,这个发送可能会发生丢失。比如网络延迟不可达等。
- 存储阶段:Broker肯定是先把消息放到内存的,然后根据刷盘策略持久化到硬盘中。刚收到Producer的消息,放入内存,但是异常宕机了,导致消息丢失。
- 消费阶段:消费失败。比如先提交ack再消费,处理过程中出现异常,该消息就出现了丢失。
解决方案:
- 生产阶段:使用同步发送失败重试机制;异步发送重写回调方法检查发送结果;Ack确认机制。
- 存储阶段:同步刷盘机制;集群模式采用同步复制。
- 消费阶段:正常消费处理完成才提交ACK;如果处理异常返回重试标识。
RocketMQ如何保证消息顺序
RocketMQ架构本身是无法保证消息有序的,但是提供了相应的API保证消息有序消费。RocketMQ API利用FIFO先进先出的特性,保证生产者消息有序进入同一队列,消费者在同一队列消费就能达到消息的有序消费。
- 创建顺序消息:首先,Producer需要确保按照业务需求将消息发送为顺序消息。这意味着在发送消息时,需要指定相同的消息队列选择策略,并保证相同的业务键(Business Key)或消息队列选择器(Message Queue Selector)。
- 定义消息队列选择策略:RocketMQ提供了多种消息队列选择策略,用于确定消息发送到哪个队列。其中,按照业务键或消息队列选择器来选择消息队列是实现顺序消费的关键。你可以自定义消息队列选择器来实现自定义的顺序规则。
- 设置消费模式:对于顺序消费,Consumer需要设置为集群模式(Cluster Mode),而不是广播模式(Broadcast Mode)。在集群模式下,同一消费者组中的每个Consumer实例只会消费特定队列的消息,从而实现顺序消费。
- 注册顺序消费监听器:Consumer需要注册顺序消费的监听器(MessageListener),并实现相应的逻辑处理。监听器将按照顺序接收到的消息进行消费,并保证消息的有序性。
需要注意的是,顺序消费在一些特殊情况下可能会受到限制。例如,如果Broker节点发生故障导致消息迁移或重新负载,可能会破坏消息的严格顺序。此外,由于RocketMQ的分布式特性,顺序消费也会受到网络延迟和负载均衡等因素的影响。因此,在设计应用程序时,需要权衡顺序消费的需求和系统的实际情况。
RocketMQ的事务消息原理
RocketMQ 的事务消息是一种保证消息可靠性的机制。在RocketMQ中,事务消息的实现原理主要是通过两个发送阶段和一个确认阶段来实现的。
- 发送消息的预处理阶段:在发送事务消息之前,RocketMQ 会将消息的状态设置为“Preparing”,并将消息存储到消息存储库中。
- 执行本地事务:当预处理阶段完成后,消息发送者需要执行本地事务,并返回执行结果(commit 或 rollback)。
- 消息的二次确认阶段:根据本地事务的执行结果,如果是 commit,则 RocketMQ 将消息的状态设置为“Committing”;否则将消息的状态设置为“Rollback”。
- 完成事务:最后在消息的消费者消费该消息时,RocketMQ 会根据消息的状态来决定是否提交该消息。如果消息的状态是“Committing”,则直接提交该消息;否则忽略该消息。
需要注意的是,如果在消息发送的过程中出现异常或者网络故障等问题,RocketMQ 会触发消息回查机制。在回查过程中,RocketMQ 会调用消息发送方提供的回查接口来确认事务的提交状态,从而解决消息投递的不确定性。