## 如何顺序处理设备上报的数据

news2024/12/23 20:38:57
1. 引言

随着智能技术的发展,市场上出现了很多的智能设备,其具有连接网络的能力。用户可以实现远程控制,并且设备也可上报自己的状态,实现云端对设备的运行情况分析。在某些情况下需要保证设备上报状态的有序性,例如传感器数据采集和处理,其中数据的顺序很重要,因为它们可能表示实时的物理过程或事件。为了能实现消息顺序消费,可以用什么办法实现呢?

2. 消息队列实现消息的顺序消费

顺序消息是RocketMQ提供的一种高级消息类型,消费者按照发送消息的顺序性去处理消息。即在进行设备状态上报的时候消息发送到消息队列里面时就要先保持有序,之后进行消息处理时才能获取有序的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传在这里插入图片描述

如上图所示,在设备端采集设备的状态,放到MQ进行存储起来,起到缓冲作用,之后再去消费消息。

因此,生产者进行发送消息时,就需要保持消息的有序性。

3. 状态上报的有序性

为保证状态推送的有效性,需要指定消息的关键字,其中设备ID是唯一标识设备,因此同一个设备上报的状态就被同一个队列中,这样就能保持统一设备的状态是有序的。

其代码示例如下:

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者实例
        producer.start();
        
        // 设置消息队列选择器
        producer.setQueueSelector(new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                // 获取设备 ID
                String deviceId = (String) arg;
                // 计算队列索引
                int index = Math.abs(deviceId.hashCode()) % mqs.size();
                // 返回对应的消息队列
                return mqs.get(index);
            }
        });

        // 发送消息,模拟100个设备上报状态
        for (int i = 0; i < 100; i++) {
            // 构建消息体
            Message msg = new Message("device_status_topic", "device_status_tag", ("Device status " + i).getBytes());
            // 设置设备 ID 作为消息队列选择器的参数
            String deviceId = "device_" + i % 10;
            SendResult sendResult = producer.send(msg, deviceId);
            System.out.println(sendResult);
        }

        // 关闭生产者实例
        producer.shutdown();
    }
}

如上述的代码所示,首先创建了生产者,定义了MQ服务器地址,启动的生产者实例;接着以设备ID为参数,以取模的方式进行哈希计算,计算出该设备属于哪个队列,之后对应该设备ID的消息都会被发送到相同的队列。接着使用for循环,模拟100个设备上报,首先生成消息,然后使用send方法进行推送到指定的队列。

4. 状态消费的有序性

在消息生产的时候保持了有序性,为了实现消息消费的有序性,消费消息时需要严格按照接收—处理—应答的语义处理消息。

其代码示例如下:

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("device_status_topic", "device_status_tag");
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                // 遍历消息列表
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                // 返回消费状态
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.println("Consumer started");

        // 等待一段时间后关闭消费者实例
        Thread.sleep(60000);
        consumer.shutdown();
    }
}

在代码中,创建了消费组实例,设置了MQ服务的地址,订阅了指定的主题和标签。其中较为重要的一步是注册消息监听器,在该方法中,使用了顺序消费,即在队列里面的消息会根据存储的先后顺序推送给消费者,进行消费。如果消费不成功,会重新将该消息放到队列的头部,重新推送。

需要注意的是,在RocketMQ中,同一消息队列上的消息是有序的,但不同消息队列之间的消息是无序的。因此,我们需要通过消息队列选择器来确保同一设备的消息发送到同一消息队列上,从而实现对同一设备的消息顺序消费。

5. 小结

如上介绍了如何实现消息推送的有序性,其核心原则:先发送的先消费、后发送的后消费。生产者通过设置消息队列选择器来实现消息的顺序生产。消费者通过注册消息监听器来实现消息的顺序消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/566015.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity之ShaderGraph 节点介绍 Input输入节点

目录 Input&#xff08;输入&#xff09;  1、Basic&#xff08;基本&#xff09;   1) Boolean&#xff08;布尔&#xff09;   2) Color&#xff08;颜色&#xff09;   3) Constant&#xff08;常量&#xff09;   4) Integer&#xff08;整型&#xff09;   5)…

weblogic CVE-2023-21839 复现

影响版本 Weblogic 12.2.1.3.0 Weblogic 12.2.1.4.0 Weblogic 14.1.1.0.0 这里是用的docker下载的vulhub的CVE-2023-21839 靶机和攻击机都是192.168.85.131 docker 启动环境 ocker-compose up -d 然后看一下说明书 vim README.zh-cn.md 让你访问ip:7001/console 好&a…

chatgpt赋能python:PythonWoody:网站优化工具的首选

Python Woody: 网站优化工具的首选 在当今数字化时代&#xff0c;网站被认为是企业的门面。 然而&#xff0c;这只是建立网络存在的起点。 在许多情况下&#xff0c;优化网站并提高其排名对于企业的成功至关重要。 在这里&#xff0c;Python Woody成为了网站优化工具的首选。 …

LVGL-最新版本及其版本定义标准

lvgl的最新版本是9.0.0&#xff0c;处于开发分支中。 稳定版本是8.3.0. 建议一般开发使用稳定版8.3.0. .\lvgl.h定义了当前版本 /*************************** CURRENT VERSION OF LVGL ***************************/ #define LVGL_VERSION_MAJOR 8 #define LVGL_VERSION_MINO…

《JavaEE》HTTPS

文章目录 HTTPS起源HTTPS对称加密非对称加密两者的区别 HTTPS的安全问题使用对称加密正常交互黑客入侵解决方案 非对称加密引入非对称加密后的流程 中间人攻击黑客的入侵方案加入后的流程解决方案黑客再次加注解决方案 ​&#x1f451;作者主页&#xff1a;Java冰激凌 &#x1…

ChatGPT突然上线APP!iPhone可用、速度更快,GPT-4用量限制疑似取消

新建了一个网站 ChatGPT人工智能中文站 - ChatGPT人工智能中文站http://ai.weoknow.com 每天给大家更新可用的国内可用chatGPT免费镜像站 OpenAIChatGPT正式推出iOS应用程序的官方公告突然发布。 立即在苹果商店的免费列表中排名第二&#xff0c;在效率列表中排名第一。 &am…

VScode+LaTeX 配置时遇到的一些问题

文章目录 VScodeLaTeX 配置时遇到的一些问题1. json 配置文件总览2. 使用 SumatraPDF 作为 pdf 阅读器时的双向跳转3. 选择使用 VScode 内置的 tab 打开 pdf 或者使用外部 SumatraPDF 打开4. 关于 LaTeX Workshop 插件的安装 VScodeLaTeX 配置时遇到的一些问题 1. json 配置文…

『MySQL 实战 45 讲』17 - 如何正确地显示随机消息?(随机抽取 3 个词)

如何正确地显示随机消息&#xff1f;&#xff08;随机抽取 3 个词&#xff09; 需求&#xff1a;从用户的英语单词表中&#xff0c;随机选择三个单词&#xff0c;创表和插入数据如下&#xff1a; # 建表 CREATE TABLE words (id INT(11) NOT NULL AUTO_INCREMENT,word VARCHA…

Chatgpt版本的opencv安装教程

文章目录 前言一、安装opencv方法一二、安装opencv方法二 前言 最近刚买了台RTX 3070的电脑&#xff0c;顺手刷了个ubuntu系统专门玩Carla&#xff0c;为了方便查资料&#xff0c;也顺手搭了浏览chatgpt的环境&#xff0c;用的clash&#xff0c;还挺好用的。然后刚好在看Carla…

(转载)MATLAB智能算法30个案例分析(4)——基于遗传算法的TSP算法

1 理论基础 TSP(traveling salesman problem,旅行商问题)是典型的NP完全问题&#xff0c;即其最坏情况下的时间复杂度随着问题规模的增大按指数方式增长&#xff0c;到目前为止还未找到一个多项式时间的有效算法。 TSP问题可描述为&#xff1a;已知n个城市相互之间的距离&…

chatgpt赋能python:PythonUrwid:一个优秀的控制台UI工具

Python Urwid&#xff1a;一个优秀的控制台UI工具 在开发控制台应用程序时&#xff0c;通常需要一种轻而易举的方法来创建用户界面。Python Urwid是一个高效&#xff0c;可定制的控制台UI工具&#xff0c;它可以帮助你创建强大的用户界面&#xff0c;同时获取出色的响应时间。…

SpringCloudAlibaba:继解决登录问题之后,Sentinel持久化没有效果问题

说实话好麻烦&#xff0c;每次使用关于Nacos的时候&#xff0c;bootstrap.yaml中都得配置username和password。 我后悔了。。。 哪位大哥有好办法啊&#xff01;&#xff01;&#xff01; 因为之前开启登录鉴权&#xff0c;导致使用Nacos就得配username和password&#xff0c…

day2 - 使用OpenCV进行图像的读取与展示

本期将使用OpenCV对图像进行一些基本的了解和操作&#xff1a;主要包含图像的读取、展示和保存&#xff0c;以及查看图像的基本属性&#xff0c;让我们充分的了解图像&#xff0c;为后续图像处理做准备。 完成本期内容&#xff0c;你可以&#xff1a; 会使用OpenCV对图像进行读…

Redis常用命令详解

Redis 是Remote Dictionary Service 的简称&#xff1b;也是远程字典服务。它是内存数据库&#xff0c;KV 数据库&#xff0c;数据结构数据库。它是一个单线程的单reactor模型。其交互方式是请求响应方式。在正常情况下&#xff0c;如果向redis发出请求&#xff0c;则一定会有响…

MyBatis 框架

MyBatis 框架 MyBatis 简介搭建 MyBatis 开发环境核心配置文件详解mapper 映射文件&#xff08;实现增删改查&#xff09;MyBatis获取参数值的两种方式MyBatis的各种查询功能特殊SQL的执行自定义映射resultMapresultMap 字段和属性的映射多对一映射处理一对多映射处理 动态SQLM…

11 - YOLO算法二 (目标检测)

要点&#xff1a; 三 YOLO v3 3.1 Darknet-53 &#xff08;backbone&#xff09; 3.2 目标边界框的预测 将预测的边界框中心限制在当前cell中&#xff0c; s(x) Sigmoid(x) 。 3.3 正负样本的匹配 3.4 损失的计算 3.4.1 置信度损失 (Binary Cross Entropy) 其中 表示预测…

能耗监测系统在淼泉卫生院项目的应用

摘要&#xff1a;随着社会生活水平的提高&#xff0c;经济的繁荣发展&#xff0c;人们对能源的需求逐渐增长&#xff0c;由此带来的能源危机日益严重。办公建筑、医院、商场等场所如何实时的了解、分析和控制能源消耗已成为需要解决的迫在眉睫的难题。传统的能源消耗只能以月/季…

CPU性能优化:分支预测

条件跳转引起的控制冒险虽然也可以通过在流水线中插入空泡来避免&#xff0c;但是当流水线很深时&#xff0c;需要插入更多的空泡。一个20级的流水线为例&#xff0c;如果一条指令需要上一条指令的执行结束才能执行&#xff0c;则需要在这两条指令之间插入19个空泡&#xff0c;…

STL详解— list类的模拟实现

本文章所需实现三个类及其每个类里的各个函数接口总览&#xff1a; namespace zhc {//模拟实现list当中的结点类template<class T>struct list_node{//成员函数list_node(const T& val T()); //构造函数//成员变量T _val; //数据域list_node<T&g…