随着智能技术的发展,市场上出现了很多的智能设备,其具有连接网络的能力。用户可以实现远程控制,并且设备也可上报自己的状态,实现云端对设备的运行情况分析。在某些情况下需要保证设备上报状态的有序性,例如传感器数据采集和处理,其中数据的顺序很重要,因为它们可能表示实时的物理过程或事件。为了能实现消息顺序消费,可以用什么办法实现呢?
顺序消息是RocketMQ
提供的一种高级消息类型,消费者按照发送消息的顺序性去处理消息。即在进行设备状态上报的时候消息发送到消息队列里面时就要先保持有序,之后进行消息处理时才能获取有序的消息。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
如上图所示,在设备端采集设备的状态,放到MQ进行存储起来,起到缓冲作用,之后再去消费消息。
因此,生产者进行发送消息时,就需要保持消息的有序性。
为保证状态推送的有效性,需要指定消息的关键字,其中设备ID是唯一标识设备,因此同一个设备上报的状态就被同一个队列中,这样就能保持统一设备的状态是有序的。
其代码示例如下:
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者实例
producer.start();
// 设置消息队列选择器
producer.setQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 获取设备 ID
String deviceId = (String) arg;
// 计算队列索引
int index = Math.abs(deviceId.hashCode()) % mqs.size();
// 返回对应的消息队列
return mqs.get(index);
}
});
// 发送消息,模拟100个设备上报状态
for (int i = 0; i < 100; i++) {
// 构建消息体
Message msg = new Message("device_status_topic", "device_status_tag", ("Device status " + i).getBytes());
// 设置设备 ID 作为消息队列选择器的参数
String deviceId = "device_" + i % 10;
SendResult sendResult = producer.send(msg, deviceId);
System.out.println(sendResult);
}
// 关闭生产者实例
producer.shutdown();
}
}
如上述的代码所示,首先创建了生产者,定义了MQ服务器地址,启动的生产者实例;接着以设备ID为参数,以取模的方式进行哈希计算,计算出该设备属于哪个队列,之后对应该设备ID的消息都会被发送到相同的队列。接着使用for循环,模拟100个设备上报,首先生成消息,然后使用send方法进行推送到指定的队列。
在消息生产的时候保持了有序性,为了实现消息消费的有序性,消费消息时需要严格按照接收—处理—应答的语义处理消息。
其代码示例如下:
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题和标签
consumer.subscribe("device_status_topic", "device_status_tag");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 遍历消息列表
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
// 返回消费状态
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.println("Consumer started");
// 等待一段时间后关闭消费者实例
Thread.sleep(60000);
consumer.shutdown();
}
}
在代码中,创建了消费组实例,设置了MQ服务的地址,订阅了指定的主题和标签。其中较为重要的一步是注册消息监听器,在该方法中,使用了顺序消费,即在队列里面的消息会根据存储的先后顺序推送给消费者,进行消费。如果消费不成功,会重新将该消息放到队列的头部,重新推送。
需要注意的是,在RocketMQ中,同一消息队列上的消息是有序的,但不同消息队列之间的消息是无序的。因此,我们需要通过消息队列选择器来确保同一设备的消息发送到同一消息队列上,从而实现对同一设备的消息顺序消费。
如上介绍了如何实现消息推送的有序性,其核心原则:先发送的先消费、后发送的后消费。生产者通过设置消息队列选择器来实现消息的顺序生产。消费者通过注册消息监听器来实现消息的顺序消费。