rocketmq也有延迟消息,经典的应用场景:订单30分钟未支付,则取消的场景
其他博客提到从rocketmq5.0开始,支持自定义延迟时间,4.x只支持预定义延迟时间,安装rocketmq可参考RocketMq简介及安装、docker安装rocketmq、安装rocketmq可视化管理端
引入rocketmq5.x对应的客户端
引入支持rocketmq5.x的rocketmq-spring
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
application.yml配置
rocketmq:
name-server: ip:9876
生产者
延迟方法有2种,
- 延迟x时间执行,
syncSendDelayTimeMills
,延迟时间单位是毫秒、syncSendDelayTimeSeconds
,延迟时间单位是秒
- 指定未来的某个时间点执行
syncSendDeliverTimeMills
,某个时间点的时间戳
在实践中,我选择了延迟x时间执行
,因为在测试时发现,如果应用宿主机的时间与rocketmq宿主机的时间不同步时会出现问题,比如应用宿主机时间比rocketmq宿主机时间晚了2分钟,发送了个未来2分钟执行的消息,因为宿主机时间差原因,可能会刚发送就会立刻执行。
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@Autowired
private RocketMQTemplate rocketMQTemplate;
pubic void sendDelayMsg(){
//延迟时间,单位毫秒
Integer delay = 5000;
String msgBody= JSONObject.toJSONString(someObject);
rocketMQTemplate.syncSendDelayTimeMills(
"myTopic:myTag",
msgBody,
delay);
}
消费者
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
@RocketMQMessageListener(topic = "myTopic",
selectorExpression = "myTag",
consumerGroup = "myConsumerTag")
public class MyMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String msgBody) {
log.info("收到的消息是:{}", msgBody);
final SomeClass someObj = JSONObject.parseObject(msgBody, SomeClass.class);
}
}