当搭建IoT管理后台后,APP、设备、云端三端就可以实现交互;当点击APP中的控制按钮,其控制指令就可以经过云端转发到设备执行,当设备执行后将设备的状态上报到云端,APP通过轮训可以取到设备此时的状态,在控件上展示控制结果。
以上的控制流程属于即时控制,便于理解。在人们使用设备的场景中,存在一种定时/延时的场景,比如这种家里的烟机延时3分钟后再关机,那定时/延时场景是如何实现的呢?有什么较好的方式来实现?
目前有两种方式进行实现:
- 传统方式
- 使用消息中间件
传统方法
其工作原理是在APP端设置一个未来的时间戳和运行的周期,这些表示未来时间的数据和控制指令会被放入云端数据库存储起来,云端会启动定时任务,比如每隔5s去扫描数据库数据,如果时间满足,就会将设置的控制命令下发到设备,从而实现了设备控制。
以上的方式属于传统基于数据库的定时调度方案,在分布式场景下,性能不高,定时精度不高,实现复杂。因此,接下来提出基于RocketMQ的定时消息实现定时调度任务。
RocketMQ定时任务
首先展示一下,RocketMQ实现延时消息的流程图:
如图所示,用户在APP设置的控制指令和定时时间会发送到RocketMQ队列中存储起来,当时间到指定时间后,作为消费者的推送平台才能把控制指令推送给设备。这种基于成熟的消息中间件的方式有着以下的优势:
- 精度高、开发门槛低
- 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。 Apache RocketMQ 的定时消息具有高并发和水平扩展的能力。
接下来以代码的方式介绍一下如何实现消息延时
要在SpringBoot中使用RocketMQ进行延时消息发送,需要使用RocketMQ的定时消息功能,一共4步。
- 在pom.xml添加依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter.version}</version>
</dependency>
- 在application.properties文件中添加以下配置:
rocketmq.name-server=your-nameserver-ip:9876
rocketmq.producer.group=your-group-name
- 创建一个RocketMQ生产者类:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendDelayMessage(String topic, Object message, long timeout, int delayLevel) {
// 封装消息
Message<Object> msg = MessageBuilder.withPayload(message).build();
// 发送消息
rocketMQTemplate.syncSend(topic, msg, timeout, delayLevel);
}
}
如上在代码中添加了rocketMQTemplate.syncSend(topic, msg, timeout, delayLevel); 这句代码设置了delayLevel。
其延时level表默认配置如下:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
解释:
level有以下三种情况:
level == 0,消息为非延迟消息;
1<=level<=maxLevel,消息延迟特定时间,例如level1,延迟1s;
level > maxLevel,则level maxLevel,例如level==20,延迟2h。
- 在需要发送延时消息的地方调用sendDelayMessage方法:
rocketMQProducer.sendDelayMessage("your-topic-name", "your-message", "timeout", 3);
这句代码设置了delayLevel=3,对应messageDelayLevel中的10s,表示消息需要延时10s才能消费。
综上所述,以上介绍了RocketMQ实现延时消息的实现方法,在实现延时的时候,需要根据业务提前确认好delayLevel,该方式也常用于延时关闭支付订单。给个思考,如果想自定义延时失败,比如33分钟,代码应该如何改呢?