SpringBoot整合RocketMQ
1、快速实战
按照SpringBoot三板斧,快速创建RocketMQ的客户端。创建Maven工程,引入关键依赖:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.5.9</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
使用SpringBoot集成时,要非常注意版本!!!
启动类
@SpringBootApplication
public class RocketMQSBApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQSBApplication.class,args);
}
}
配置文件:
rocketmq.name-server=192.168.65.112:9876
rocketmq.producer.group=springBootGroup
#如果这里不配,那就需要在消费者的注解中配。
#rocketmq.consumer.topic=
rocketmq.consumer.group=testGroup
server.port=9000
接下来就可以声明生产者,直接使用RocketMQTemplate进行消息发送。
package com.roy.rocketmq.basic;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author :
* @description:
**/
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
}
另外,这个rocketMQTemplate不光可以发消息,还可以主动拉消息。
消费者的声明也很简单。所有属性通过@RocketMQMessageListener注解声明。
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY,messageModel= MessageModel.BROADCASTING)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : "+ message);
}
}
这里唯一需要注意下的,就是消息了。SpringBoot框架中对消息的封装与原生API的消息封装是不一样的。
2、如何处理各种消息类型
1、各种基础的消息发送机制参见单元测试类:com.roy.rocketmq.SpringRocketTest
2、一个RocketMQTemplate实例只能包含一个生产者,也就只能往一个Topic下发送消息。如果需要往另外一个Topic下发送消息,就需要通过@ExtRocketMQTemplateConfiguration()注解另外声明一个子类实例。
3、对于事务消息机制,最关键的事务监听器需要通过@RocketMQTransactionListener注解注入到Spring容器当中。在这个注解当中可以通过rocketMQTemplateBeanName属性,指向具体的RocketMQTemplate子类。
3、实现原理
1、Push模式
Push模式对于@RocketMQMessageListener注解的处理方式,入口在rocketmq-spring-boot-2.2.2.jar中的org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration类中。
怎么找到的?评经验猜以及碰运气。
这个ListenerContainerConfiguration类继承了Spring当中的SmartInitializingSingleton接口,当Spring容器当中所有非懒加载的实例加载完成后,就会触发他的afterSingletonsInstantiated方法进行初始化。在这个方法中会去扫描所有带有注解@RocketMQMessageListener注解的类,将他注册到内部一个Container容器当中。
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
}
这里这个Container可以认为是客户端实例的一个容器,通过这个容器来封装RocketMQ的原生API。
registerContainer的方法挺长的,我这里截取出跟今天的主题相关的几行重要的源码:
private void registerContainer(String beanName, Object bean) {
.....
//获取Bean上面的注解
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
...
//检查注解的配置情况
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
//将扫描到的注解转化成为Container,并注册到上下文中。
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
//启动容器,这里就相当于是启动了消费者
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
这其中最关注的,当然是创建容器的createRocketMQListenerContainer方法中。而在这个方法中,你基本看不到RocketMQ的原生API,都是在创建并维护一个DefaultRocketMQListenerContainer对象。而这个DefaultRocketMQListenerContainer类,就是我们今天关注的重点。
DefaultRocketMQListenerContainer类实现了InitializingBean接口,自然要先关注他的afterPropertiesSet方法。这是Spring提供的对象初始化的扩展机制。
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: {}", messageType);
}
这个方法就是用来初始化RocketMQ消费者的。在这个方法里就会创建一个RocketMQ原生的DefaultMQPushConsumer消费者。同样,方法很长,抽取出比较关注的重点源码。
private void initRocketMQPushConsumer() throws MQClientException {
.....
//检查并创建consumer对象。
if (Objects.nonNull(rpcHook)) {
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
// 定制instanceName,有没有很熟悉!!!
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
.....
//设定广播消费还是集群消费。
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
//维护消费者的其他属性。
...
//指定Consumer的消费监听 --》在消费监听中就会去调用onMessage方法。
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
}
这整个就是在维护RocketMQ的原生消费者对象。其中的使用方式,其实有很多地方是很值得借鉴的,尤其是消费监听的处理。
2、Pull模式
Pull模式的实现其实是通过在RocketMQTemplate实例中注入一个DefaultLitePullConsumer实例来实现的。只要注入并启动了这个DefaultLitePullConsumer示例后,后续就可以通过template实例的receive方法,来调用DefaultLitePullConsumer的poll方法,主动去Pull获取消息了。
初始化DefaultLitePullConsumer的代码依然是在rocketmq-spring-boot-2.2.2.jar包中。不过处理类是org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration。这个配置类会配置在jar包中的spring.factories文件中,通过SpringBoot的自动装载机制加载进来。
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"}) //解析的springboot配置属性。
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
...
//创建消费者
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
return litePullConsumer;
}
RocketMQUtil.createDefaultLitePullConsumer方法中,就是在维护一个DefaultLitePullConsumer实例。这个实例就是RocketMQ的原生API当中提供的拉模式客户端。
实际开发中,拉模式用得比较少。但是,其实RocketMQ针对拉模式也做了非常多的优化。原本提供了一个DefaultMQPullConsumer类,进行拉模式消息消费,DefaultLitePullConsumer在此基础上做了很多优化。有兴趣可以自己研究一下。
4.示例(RockerMQ整合springboot中的事务消息)
生产者:
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
//生产者:发送普通消息
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
//生产者:发送事务消息
public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
//尝试在Header中加入一些自定义的属性。
Message<String> message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID,"TransID_"+i)
//发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。
.setHeader(RocketMQHeaders.TAGS,tags[i % tags.length])
//MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。
.setHeader("MyProp","MyProp_"+i)
.build();
String destination =topic+":"+tags[i % tags.length];
//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
}
}
消费者:
/**
*
* @description: 事务消息监听器
* 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
* 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
* 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
**/
//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
//@RocketMQTransactionListener(rocketMQTemplateBeanName = "ExtRocketMQTemplate")
@RocketMQTransactionListener()
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId,msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = "+msg);
//转成RocketMQ的Message对象
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
String tags = message.getTags();
if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
//延迟检查的时间间隔要有点奇怪。
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
//这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = "+originalMessage);
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
if(StringUtils.contains(tags,"TagC")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagD")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
而对于可能存在不同的事务消息的情况,则需要重新申明一个template去处理消息:
如通过@ExtRocketMQTemplateConfiguration() 去申明一个template
@ExtRocketMQTemplateConfiguration()
//@ExtRocketMQConsumerConfiguration(topic="stock_consumer_group")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
那么在消费者端,就需要去指定需要哪个template去处理,如下:
/**
*
* @description: 事务消息监听器
* 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
* 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
* 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
**/
//@RocketMQTransactionListener()
//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
@RocketMQTransactionListener(rocketMQTemplateBeanName = "ExtRocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId,msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = "+msg);
//转成RocketMQ的Message对象
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
String tags = message.getTags();
if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
//延迟检查的时间间隔要有点奇怪。
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
//这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = "+originalMessage);
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
if(StringUtils.contains(tags,"TagC")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagD")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
测试用例:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringRocketTest {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Test
public void sendMessageTest(){
String springTopic="TestTopic";
//发送字符消息
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(
new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
//发送对象消息
rocketMQTemplate.asyncSend(springTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
//发送指定TAG的消息
rocketMQTemplate.convertAndSend(springTopic + ":tag0", "I'm from tag0"); // tag0 will not be consumer-selected
System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag0");
rocketMQTemplate.convertAndSend(springTopic + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag1");
//同步发送消息并且返回一个String类型的结果。
String replyString = rocketMQTemplate.sendAndReceive(springTopic, "request string", String.class);
System.out.printf("send %s and receive %s %n", "request string", replyString);
//同步发送消息并且返回一个Byte数组类型的结果。
byte[] replyBytes = rocketMQTemplate.sendAndReceive(springTopic, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);
System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));
//同步发送一个带hash参数的请求(排序消息),并返回一个User类型的结果
User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
User requestUser2 = new User().setUserAge((byte) 9).setUserName("requestUserName");
User requestUser3 = new User().setUserAge((byte) 9).setUserName("requestUserName");
User requestUser4 = new User().setUserAge((byte) 9).setUserName("requestUserName");
User replyUser = rocketMQTemplate.sendAndReceive(springTopic, requestUser, User.class, "order-id");
User replyUser2 = rocketMQTemplate.sendAndReceive(springTopic, requestUser2, User.class, "order-id");
User replyUser3 = rocketMQTemplate.sendAndReceive(springTopic, requestUser3, User.class, "order-id");
User replyUser4 = rocketMQTemplate.sendAndReceive(springTopic, requestUser4, User.class, "order-id");
System.out.printf("send %s and receive %s %n", requestUser, replyUser);
//同步发送一个带延迟级别的消息(延迟消息),并返回一个泛型结果
ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(springTopic, "request generic",
new TypeReference<ProductWithPayload<String>>() {
}.getType(), 30000, 2);
System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
//异步发送消息,返回String类型结果。
rocketMQTemplate.sendAndReceive(springTopic, "request string", new RocketMQLocalRequestCallback<String>() {
@Override public void onSuccess(String message) {
System.out.printf("send %s and receive %s %n", "request string", message);
}
@Override public void onException(Throwable e) {
e.printStackTrace();
}
});
//异步发送消息,并返回一个User类型的结果。
rocketMQTemplate.sendAndReceive(springTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
@Override public void onSuccess(User message) {
System.out.printf("send user object and receive %s %n", message.toString());
}
@Override public void onException(Throwable e) {
e.printStackTrace();
}
}, 5000);
//发送批量消息
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
System.out.printf("--- Batch messages send result :" + sr);
}
}
四、RocketMQ最佳实践
1、合理分配Topic、Tag
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。
Kafka的一大问题是Topic过多,会造成Partition文件过多,影响性能。而RocketMQ中的Topic完全不会对消息转发性能有影响。但是Topic过多,还是会加大RocketMQ的元数据维护的性能消耗。所以,在使用时,还是需要对Topic进行合理的分配。
使用Tag区分消息时,尽量直接使用Tag过滤,不要使用复杂的SQL过滤。因为消息过滤机制虽然可以减少网络IO,但是毕竟会加大Broker端的消息处理压力。所以,消息过滤的逻辑,还是越简单越好。
2、使用Key加快消息索引
分配好Topic和Tag之后,自然就需要优化Key属性了,因为Key也可以参与消息过滤。通常建议每个消息要分配一个在业务层面的唯一标识码,设置到Key属性中。这有两个方面的作用:
一是可以配合Tag进行更精确的消息过滤。
另一个更重要的方面是,RocketMQ的Broker端会为每个消息创建一个哈希索引。应用可以通过topic、key来查询某一条历史的消息内容,以及消息在集群内的处理情况。在管理控制台就可以看到。为了减少哈希索引潜在的哈希冲突问题,所有官方建议,客户端要尽量保证key的唯一性。
3、关注错误消息重试
我们已经知道RocketMQ的消费者端,如果处理消息失败了,Broker是会将消息重新进行投送的。而在重试时,RocketMQ实际上会为每个消费者组创建一个对应的重试队列。重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的队列中。
多关注重试队列,可以及时了解消费者端的运行情况。这个队列中出现了大量的消息,就意味着消费者的运行出现了问题,要及时跟踪进行干预。
然后RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
重试次数 | 与上次重试的间隔时间 | 重试次数 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
这个重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个重试时间可以将源码中的org.apache.rocketmq.example.quickstart.Consumer里的消息监听器返回状态改为RECONSUME_LATER测试一下。
重试次数:
如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。
然后关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。
配置覆盖:
消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。
4、手动处理死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
死信队列的名称是%DLQ%+ConsumGroup
死信队列的特征:
- 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
- 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
- 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
- 死信队列中的消息不会再被消费者正常消费。
- 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
注:默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。
5、消费者端进行幂等控制
在MQ系统中,对于消息幂等有三种实现语义:
- at most once 最多一次:每条消息最多只会被消费一次
- at least once 至少一次:每条消息至少会被消费一次
- exactly once 刚刚好一次:每条消息都只会确定的消费一次
这三种语义都有他适用的业务场景。
其中,at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。
而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
而这个exactly once是MQ中最理想也是最难保证的一种语义,需要有非常精细的设计才行。RocketMQ只能保证at least once,保证不了exactly once。所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性。
关于这个问题,官网上有明确的回答:
4. Are messages delivered exactly once?
RocketMQ ensures that all messages are delivered at least once. In most cases, the messages are not repeated.
但是,对于exactly once语义,阿里云上的商业版RocketMQ是明确有API支持的,至于如何实现的,就不得而知了。
消息幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
-
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
处理方式
从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。