RocketMq顺序消息
- 1.RocketMq 架构图
- 2.RocketMq顺序消息
- 2.1部分消息有序
- 2.1.1 生产者构建
- 2.1.2 生产者保证有序
- 2.1.3 消费者保证有序性
- 3.使用rocketmq-spring-boot-starter发送消息如何指定tag与key?
- 问题
- 1.MessagingException: sendDefaultImpl call timeout
1.RocketMq 架构图
2.RocketMq顺序消息
顺序消息分为全局有序消息和部分有序消息,全局有序消息是指一个topic下所有的消息都是有序的,而部分有序消息是指同一类型的消息有序,举个例子,如订单创建、订单支付、订单完成。同一个订单的这三种消息保证有序就可以了,订单之间可以不用有序,这就是部分有序。
2.1部分消息有序
2.1.1 生产者构建
1.依赖
<!-- 实现对 RocketMQ 的自动化配置 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
2.配置
rocketmq:
name-server: 10.10.10.130:9876
producer:
group: my-producer-group
send-message-timeout: 15000
3.代码
@Autowired
private RocketMQTemplate rocketMQTemplate;
rocketMQTemplate.syncSendOrderly(RealtimeSubscribe.SCADA_QUEUE_KEY,gn + "," + av,item);
2.1.2 生产者保证有序
要保证生产者发送消息的有序性,必须满足以下两个条件:
1、消息的发送必须是同步发送
如果是异步的,由于网络抖动等原因,有可能导致消息到达broker的顺序与发送不一致。使用spring RocketMQTemplate 类,可以使用sync开头的方法,表示同步发送。如下:
rocketMQTemplate.syncSendOrderly(RealtimeSubscribe.SCADA_QUEUE_KEY,gn + "," + av,item);
2、要保证顺序的消息必须发送到同一个queue里
在RocketMq的实现里,为了实现高并发,一个topic是可以设置为多个队列的,这多个队列有可能是在一个broker里,也有可能是在多个broker里。如果要保证顺序的消息被分别发到不同的队列,那有可能会被不同机器的不同线程同时消费,这就达不到顺序的要求。如何做到向同一个队列发送呢?队列选择它有一个hashkey的设计,只要保证需要顺序性的消息的hashkey一致,那么这些消息就会向同一个队列发送。如下代码:
第1步 传入hashKey
public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
return syncSendOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
}
第2步,根据hashKey 选择队列
public class SelectMessageQueueByHash implements MessageQueueSelector {
public SelectMessageQueueByHash() {
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value %= mqs.size();
return (MessageQueue)mqs.get(value);
}
}
在5月11号的时候修改发送代码如下,传入相同的hashKey之后,所有的消息便往同一个队列发送了。
rocketMQTemplate.syncSendOrderly(RealtimeSubscribe.SCADA_QUEUE_KEY,gn + "," + av,item);
实际上,采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,将可能导致同一个业务中的不同消息被发送到不同的队列中,导致同一个业务的不同消息被存入不同的队列中,短暂的造成部分消息无序。同样的,如果增加了服务器,那么也会造成短暂的造成部分消息无序。
2.1.3 消费者保证有序性
3.使用rocketmq-spring-boot-starter发送消息如何指定tag与key?
使用rocketmq-spring-boot-starter发送消息如何指定tag与key
问题
1.MessagingException: sendDefaultImpl call timeout
org.springframework.messaging.MessagingException: sendDefaultImpl call timeout; nested exception is org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:551)
at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:472)
at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:460)
at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:867)
at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:55)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:129)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:122)
at com.sx.mapi.subscribe.RealtimeSubscribe.onResponse(RealtimeSubscribe.java:100)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:499)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.sx.mapi.subscribe.RealtimeSubscribe$$EnhancerBySpringCGLIB$$1.onResponse(<generated>)
at com.magus.jdbc.net.OPSubscribe.onEvent(OPSubscribe.java:291)
Caused by: org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:667)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344)
at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:543)
... 20 more
解决方法:加上如下配置
rocketmq:
producer:
send-message-timeout: 15000