文章目录
- 顺序消息应用场景
- 消息组(MessageGroup)
- 顺序性
- 生产的顺序性
- MQ 存储的顺序性
- 消费的顺序性
- rocketmq-client-java 示例(gRPC 协议)
- 1. 创建 FIFO 主题
- 生产者代码
- 消费者代码
- 解决办法
- 解决后执行结果
- rocketmq-client 示例(Remoting 协议)
- 生产者
- MessageQueueSelector 详解
- 消费者
顺序消息应用场景
在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 RocketMQ 的顺序消息可以有效保证数据传输的顺序性。比如:同一个用户的操作,一定是先生成订单、再进行支付、扣减库存、生成物流信息等。
消息组(MessageGroup)
RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。
基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。
顺序性
RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。
生产的顺序性
生产的顺序性就是必须保证每个消息在生成时是顺序的,且顺序的发送到 MQ 服务器。要保证生产的顺序,需要满足以下条件
- 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
- 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
MQ 存储的顺序性
MQ 按顺序收到消息后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
- 相同消息组的消息按照先后顺序被存储在同一个队列。
- 不同消息组的消息可以混合在同一个队列中,且不保证连续。
消费的顺序性
消费的顺序性,是消费者在消费的时候要严格按照 MQ 中的存储顺序来执行。
- 消费者保证执行的顺序
- PushConsumer 类型消费者,RocketMQ 会保证消息按照存储顺序一条一条投递给消费者
- SimpleConsumer 类型消费者,需要业务实现方自行保证消费的顺序。消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。
- 重试策略
Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。
所以对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。
rocketmq-client-java 示例(gRPC 协议)
1. 创建 FIFO 主题
本示例,我们模拟多个用户的一系列操作,并多个消息组区分不同的顺序消息。要求每个用户的消息按顺序执行,不同用户的消息之间不做必要关联。
$> ./mqadmin updatetopic -n localhost:9876 -c DefaultCluster -t MY_FIFO_TOPIC -o true -a +message.type=FIFO
注意:这里比普通消息和顺序消息多了一个 -o 参数,表示 order 的意思。
生产者代码
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.io.IOException;
public class FifoProducerDemo {
public static void main(String[] args) throws ClientException, IOException {
// 用于提供:生产者、消费者、消息对应的构建类 Builder
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 构建配置类(包含端点位置、认证以及连接超时等的配置)
ClientConfiguration configuration = ClientConfiguration.newBuilder()
// endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081
.setEndpoints(MyMQProperties.ENDPOINTS)
.build();
// 构建生产者
Producer producer = provider.newProducerBuilder()
// Topics 列表:生产者和主题是多对多的关系,同一个生产者可以向多个主题发送消息
.setTopics("MY_FIFO_TOPIC")
.setClientConfiguration(configuration)
// 构建生产者,此方法会抛出 ClientException 异常
.build();
for(int i = 1; i <= 10;i++) {
String msgGroup = "user" ; // 表示有两个用户
String keys = "key_" + i;
// 构建消息类
Message message = provider.newMessageBuilder()
// 设置消息发送到的主题
.setTopic("MY_FIFO_TOPIC")
// 设置消息索引键,可根据关键字精确查找某条消息。其一般为业务上的唯一值。如:订单id
.setKeys(keys)
// 设置消息Tag,表示为创建订单
.setTag("ORDER_CREATE")
// 设置消息组
.setMessageGroup(msgGroup)
// 消息体,单条消息的传输负载不宜过大。所以此处的字节大小最好有个限制
.setBody(("{\"success\":true,\"msg\":\""+ msgGroup + ":" + keys +"\"}").getBytes())
.build();
// 发送消息(此处最好进行异常处理,对消息的状态进行一个记录)
try {
SendReceipt sendReceipt = producer.send(message);
System.out.println(keys);
System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
} catch (ClientException e) {
System.out.println("Failed to send message");
}
}
// 发送完,关闭生产者
// producer.close();
}
}
发送顺序消息时,消息一定要设置消息组,同一消息组的消息将会按服务器接收的顺序进行消费。
注:发送顺序消息前需要设置 NameServer 中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 为 true。特别是 orderMessageEnable 默认为 false。建议在启动 namesrv 的时候使用自定义配置,在自定义配置中配置选项为true即可。
# namesrv.conf 为我们自定义的配置文件 nohup sh bin/mqnamesrv -c conf/namesrv.conf &
消费者代码
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import java.nio.ByteBuffer;
import java.util.Collections;
public class FifoConsumerDemo {
public static void main(String[] args) throws ClientException {
// 用于提供:生产者、消费者、消息对应的构建类 Builder
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 构建配置类(包含端点位置、认证以及连接超时等的配置)
ClientConfiguration configuration = ClientConfiguration.newBuilder()
// endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081
.setEndpoints(MyMQProperties.ENDPOINTS)
.build();
// 设置过滤条件(这里为使用 tag 进行过滤)
String tag = "ORDER_CREATE";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 构建消费者
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(configuration)
// 设置消费者分组
.setConsumerGroup("MY_FIFO_GROUP")
// 设置主题与消费者之间的订阅关系
.setSubscriptionExpressions(Collections.singletonMap("MY_FIFO_TOPIC", filterExpression))
.setMessageListener(messageView -> {
System.out.println(messageView);
ByteBuffer rs = messageView.getBody();
byte[] rsByte = new byte[rs.limit()];
rs.get(rsByte);
System.out.println("Message body:" + new String(rsByte));
// 处理消息并返回消费结果。
System.out.println("Consume message successfully, messageId=" + messageView.getMessageId());
return ConsumeResult.SUCCESS;
}).build();
System.out.println(pushConsumer);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}
注:多验证几次后会发现,消费执行并没有严格的按照顺序执行,查找源码后发现,PushConsumer 的 builder 在构建 PushConsumer 的时候有个 Settings 对象,该对象的主题配置信息是从服务器获取,获取后有一个 isFifo 参数,此参数对应是否顺序消费,但是目前此值一直为false。此问题为消费者分组的问题,Remoting 协议方式无此问题,因为两种 Client 的实现是不一样的。
解决办法
在 MQ bin目录执行如下命令即可,具体的相关说明,我们将在后续章节中(《RocketMQ 消费者分类与分组》)详细说明。
$> ./mqadmin updateSubGroup -n 127.0.0.1:9876 -g MY_FIFO_GROUP -o true -c DefaultCluster
解决后执行结果
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000001, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543268, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_2], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000000, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543178, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_1], messageGroup=user2, deliveryTimestamp=null, properties={__SHARDINGKEY=user2}}
Message body:{"success":true,"msg":"user1:key_2"}
Message body:{"success":true,"msg":"user2:key_1"}
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000000
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000001
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000002, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543279, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_3], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
Message body:{"success":true,"msg":"user1:key_3"}
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000002
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000004, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543294, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_5], messageGroup=user2, deliveryTimestamp=null, properties={__SHARDINGKEY=user2}}
Message body:{"success":true,"msg":"user2:key_5"}
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000004
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000003, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543288, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_4], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000005, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543301, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_6], messageGroup=user2, deliveryTimestamp=null, properties={__SHARDINGKEY=user2}}
Message body:{"success":true,"msg":"user1:key_4"}
Message body:{"success":true,"msg":"user2:key_6"}
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000005
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000003
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000006, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543313, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_7], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
Message body:{"success":true,"msg":"user1:key_7"}
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000006
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000007, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543320, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_8], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
Message body:{"success":true,"msg":"user1:key_8"}
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000007
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000008, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543331, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_9], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
Message body:{"success":true,"msg":"user1:key_9"}
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000008
MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000009, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543340, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_10], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
Message body:{"success":true,"msg":"user1:key_10"}
Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000009
注意:user1 和 user2 的操作顺序是一致的。因为我们不需要保证 user1 的操作必须在 user2 之前,只需要保证他们各自的操作为顺序的就可以。
rocketmq-client 示例(Remoting 协议)
生产者
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.shaded.commons.lang3.RandomUtils;
import java.util.List;
public class FifoProducerDemo {
/**
* 生产者分组
*/
private static final String PRODUCER_GROUP = "FIFO_PRODUCER_GROUP";
/**
* 主题
*/
private static final String TOPIC = "MY_FIFO_TOPIC";
public static void main(String[] args) throws MQClientException {
/*
* 创建生产者,并使用生产者分组初始化
*/
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
/*
* NamesrvAddr 的地址,多个用分号隔开。如:xxx:9876;xxx:9876
*/
producer.setNamesrvAddr(MyMQProperties.NAMESRV_ADDR);
/*
* 发送消息超时时间,默认即为 3000
*/
producer.setSendMsgTimeout(3000);
/*
* 启动生产者,此方法抛出 MQClientException
*/
producer.start();
/*
* 发送消息
*/
for (int i = 1; i <= 10; i++) {
try {
Message msg = new Message();
msg.setTopic(TOPIC);
// 设置消息索引键,可根据关键字精确查找某条消息。
msg.setKeys("messageKey");
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
msg.setTags("ORDER_CREATE");
// 设置消息体
msg.setBody(("顺序消息" + i).getBytes());
// 这里 userId 取值为 1,2,3(模拟有3个用户的顺序操作)
int userId = RandomUtils.nextInt(1,4);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 这个arg就是对应userId
Integer userId = (Integer)arg;
// 我们按队列的数量,对每个user进行分组
int index = userId % mqs.size();
// 同一个user的消息放入同一个队列
return mqs.get(index);
}
},userId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息发送失败!i = " + i);
}
}
// 如果生产者不再使用,则调用关闭
// 异步发送消息注意:异步发送消息,建议此处不关闭或者在sleep一段时间后再关闭
// 因为异步 SendCallback 执行的时候,shutdow可能已经执行了,生产者被关闭了
// producer.shutdown();
}
}
MessageQueueSelector 详解
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
mqs:队列列表,我们前面说了,默认 8 个队列
msg:当前消息
arg:为我们 send 方法传的第三个参数,示例中就是 userId
MessageQueueSelector 意为队列选择器,Remoting 协议客户端中没有 消息组的概念,所以需要我们手动的为消息进行分组(将需要严格顺序的消息放在同一个队列),这个接口就是完成此任务的,而且分组的逻辑需要我们自己实现。实际应用中我们可以使用 用户id、订单id等来为顺序消息分组。
消费者
import com.yyoo.mq.rocket.MyMQProperties;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class FifoConsumerDemo {
public static void main(String[] args) throws MQClientException {
// 初始化 consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("REMOTING_FIFO_CONSUMER_GROUP");
// 设置 namesrv 地址
consumer.setNamesrvAddr(MyMQProperties.NAMESRV_ADDR);
// 设置从开头开始读取消息
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置订阅的主题,以及过滤tag
consumer.subscribe("MY_FIFO_TOPIC", "ORDER_CREATE || TagA || TagD || messageTag");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for(MessageExt msg : msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
注意:顺序消息消费者的监听类型为 MessageListenerOrderly ,注意与我们前面的示例 MessageListenerConcurrently 进行区分。