如何顺序处理设备上报的数据

news2024/11/25 22:29:52
1. 引言

随着智能技术的发展,市场上出现了很多的智能设备,其具有连接网络的能力。用户可以实现远程控制,并且设备也可上报自己的状态,实现云端对设备的运行情况分析。在某些情况下需要保证设备上报状态的有序性,例如传感器数据采集和处理,其中数据的顺序很重要,因为它们可能表示实时的物理过程或事件。为了能实现消息顺序消费,可以用什么办法实现呢?

2. 消息队列实现消息的顺序消费

顺序消息是RocketMQ提供的一种高级消息类型,消费者按照发送消息的顺序性去处理消息。即在进行设备状态上报的时候消息发送到消息队列里面时就要先保持有序,之后进行消息处理时才能获取有序的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传在这里插入图片描述

如上图所示,在设备端采集设备的状态,放到MQ进行存储起来,起到缓冲作用,之后再去消费消息。

因此,生产者进行发送消息时,就需要保持消息的有序性。

3. 状态上报的有序性

为保证状态推送的有效性,需要指定消息的关键字,其中设备ID是唯一标识设备,因此同一个设备上报的状态就被同一个队列中,这样就能保持统一设备的状态是有序的。

其代码示例如下:

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者实例
        producer.start();
        
        // 设置消息队列选择器
        producer.setQueueSelector(new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                // 获取设备 ID
                String deviceId = (String) arg;
                // 计算队列索引
                int index = Math.abs(deviceId.hashCode()) % mqs.size();
                // 返回对应的消息队列
                return mqs.get(index);
            }
        });

        // 发送消息,模拟100个设备上报状态
        for (int i = 0; i < 100; i++) {
            // 构建消息体
            Message msg = new Message("device_status_topic", "device_status_tag", ("Device status " + i).getBytes());
            // 设置设备 ID 作为消息队列选择器的参数
            String deviceId = "device_" + i % 10;
            SendResult sendResult = producer.send(msg, deviceId);
            System.out.println(sendResult);
        }

        // 关闭生产者实例
        producer.shutdown();
    }
}

如上述的代码所示,首先创建了生产者,定义了MQ服务器地址,启动的生产者实例;接着以设备ID为参数,以取模的方式进行哈希计算,计算出该设备属于哪个队列,之后对应该设备ID的消息都会被发送到相同的队列。接着使用for循环,模拟100个设备上报,首先生成消息,然后使用send方法进行推送到指定的队列。

4. 状态消费的有序性

在消息生产的时候保持了有序性,为了实现消息消费的有序性,消费消息时需要严格按照接收—处理—应答的语义处理消息。

其代码示例如下:

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("device_status_topic", "device_status_tag");
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                // 遍历消息列表
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                // 返回消费状态
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.println("Consumer started");

        // 等待一段时间后关闭消费者实例
        Thread.sleep(60000);
        consumer.shutdown();
    }
}

在代码中,创建了消费组实例,设置了MQ服务的地址,订阅了指定的主题和标签。其中较为重要的一步是注册消息监听器,在该方法中,使用了顺序消费,即在队列里面的消息会根据存储的先后顺序推送给消费者,进行消费。如果消费不成功,会重新将该消息放到队列的头部,重新推送。

需要注意的是,在RocketMQ中,同一消息队列上的消息是有序的,但不同消息队列之间的消息是无序的。因此,我们需要通过消息队列选择器来确保同一设备的消息发送到同一消息队列上,从而实现对同一设备的消息顺序消费。

5. 小结

如上介绍了如何实现消息推送的有序性,其核心原则:先发送的先消费、后发送的后消费。生产者通过设置消息队列选择器来实现消息的顺序生产。消费者通过注册消息监听器来实现消息的顺序消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/578626.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

简单介绍二叉树

前言 学习数据结构&#xff0c;二叉树是一大难点&#xff0c;也是一大重点&#xff0c;小伙伴们和我一起看看二叉树的知识吧&#xff01; 本文代码是Java。 目录 前言 一、什么是二叉树 二、二叉树的遍历 &#xff08;一&#xff09;前序遍历 &#xff08;二&#xff09;中…

C++ 入门导引(这是一篇由GPT4写的文章)

C 应用场景 C 是一种广泛应用的编程语言&#xff0c;拥有多种使用场景。以下是 C 的一些主要应用场景&#xff1a; ​1. 游戏开发&#xff1a;C 常用于游戏开发&#xff0c;尤其是大型 3D 游戏。它可以轻松地与图形 API&#xff08;如 OpenGL 和 DirectX&#xff09;集成&…

element-ui树形控件el-tree详解

概述 这里我利用element-ui开发一个vue的树形组件 引入element-ui 安装element-plus cnpm install element-plus --save 安装按需导入 cnpm install -D unplugin-vue-components unplugin-auto-import 修改vite.config.js配置按需加载 import AutoImport from unplugin-a…

云原生架构:创新未来的应用开发和部署范式

点击上方“程序猿技术大咖”&#xff0c;关注并选择“设为星标” 回复“加群”获取入群讨论资格&#xff01; 摘要&#xff1a;本文深入探讨云原生架构的优势、实施指南以及关键技术和工具。通过容器化、微服务架构、持续交付和自动化管理等实践&#xff0c;云原生架构为企业提…

盘点一个Python列表的基础题目

点击上方“Python爬虫与数据挖掘”&#xff0c;进行关注 回复“书籍”即可获赠Python从入门到进阶共10本电子书 今 日 鸡 汤 随意春芳歇&#xff0c;王孙自可留。 大家好&#xff0c;我是皮皮。 一、前言 前几天在Python最强王者群【eric】问了一个Python列表基础的问题&#x…

web练习第二周

前言&#xff1a;&#xff08;博主个人学习笔记&#xff0c;不用看&#xff09;web练习第二周&#xff0c;仅做出前3题。相比于第一周&#xff0c;难度大幅增加&#xff0c;写题时就算看了wp还是像个无头苍蝇一样到处乱创&#xff0c;大多都是陌生知识点&#xff0c;工具的使用…

购买两块巧克力-第105场力扣夜喵双周赛-java双百方案

一、题目描述 给你一个整数数组 prices &#xff0c;它表示一个商店里若干巧克力的价格。同时给你一个整数 money &#xff0c;表示你一开始拥有的钱数。 你必须购买 恰好 两块巧克力&#xff0c;而且剩余的钱数必须是 非负数 。同时你想最小化购买两块巧克力的总花费。 请你…

【原创】浅谈EtherCAT主站EOE(上)-EOE网络

版权声明&#xff1a;本文为本文为博主原创文章&#xff0c;未经同意&#xff0c;禁止转载。如有问题&#xff0c;欢迎指正。博客地址&#xff1a;https://www.cnblogs.com/wsg1100/ 文章目录 一、EoE二、EoE服务规范EtherCAT主站如何提供EoE服务&#xff1f; 三、EoE网络EOE网…

我用GPT搭建了一个虚拟女友!

Datawhale干货 作者&#xff1a;仲泰&#xff0c;Datawhale成员 1. 作者知乎&#xff1a;https://www.zhihu.com/people/yong-tan-39-67 2.我用GPT搭建了一个虚拟女友-哔哩哔哩&#xff1a;https://b23.tv/GYYwMcq 3. 五月学习&#xff1a;ChatGPT应用组队学习来了&#xff01…

原生canvas标签画线——直线、平行线(设置不同颜色和宽度)

1.一条直线 效果图如下&#xff1a; 代码如下&#xff1a; <!--* Author: your name* Date: 2023-05-24 17:50:28* LastEditTime: 2023-05-24 18:06:39* LastEditors: localhost* Description: In User Settings Edit* FilePath: /canvas/day01/体验canvas.html --> &l…

Python潮流周刊#3:PyPI 的安全问题

△点击上方“Python猫”关注 &#xff0c;回复“1”领取电子书 你好&#xff0c;我是猫哥。这里记录每周值得分享的 Python 及通用技术内容&#xff0c;部分为英文&#xff0c;已在小标题注明。&#xff08;标题取自其中一则分享&#xff0c;不代表全部内容都是该主题&#xff…

网站部署与上线(2)远程连接云服务器或虚拟机

文章目录 搭建服务器部署环境配置pm2 可能听说过Windows系统提供的远程桌面。实际上&#xff0c;Linux中也提供了类似的功能&#xff0c;其远程连接基于命令行。 在Windows端连接Linux需要使用SSH软件&#xff0c;最流行的有Xshell和SecureCRT。 首先确定需要连接的云服务器或虚…

Pyside6-第一篇-创建第一个窗口

Hi&#xff0c;今天起开始更新Pyside6教程了&#xff0c;从0-1开始更新&#xff0c;过程比较的久&#xff0c;一点点来。 今天&#xff0c;我们先来搭建环境。 我的环境&#xff1a; ❝ pycharm 2021.3.3(版本随意&#xff0c;只要不是很低就行)Python版本3.95Pyside版本6.50 ❞…

【FreeRTOS】——中断优先级设置中断相关寄存器临界段代码保护调度器挂起与恢复

目录 前言&#xff1a; 一、中断优先级设置 二、中断相关寄存器&#xff08;STM32-Cortex M3&#xff09; 三、临界段代码保护 四、任务调度器的挂起和恢复 总结&#xff1a; 前言&#xff1a; 博客笔记根据正点原子视频教程编辑&#xff0c;仅供学习交流使用&#xff0…

电子器件系列38:mos管散热片

板子上需要用到一个封装为to220的mos管&#xff0c;还得立起来散热&#xff0c;得要加一个散热片。 散热片简介&#xff0c;分类&#xff1f;用途&#xff1f;如何使用&#xff1f;封装&#xff1f;使用注意事项&#xff1f; 简介&#xff1a; mos散热片是一种给电器中的易发热…

线程池实现

一、线程池介绍 1&#xff09;应用场景 当并发数很多的时候&#xff0c;并且每个线程执行时间很短的任务&#xff0c;这样就会频繁创建线程&#xff0c;而这样的频繁创建和销毁线程会大大降低系统的执行效率。对于这种场景我们可以使用线程池来复用之前创建的线程&#xff0c…

Linux—基础篇:目录结构

1、基本介绍 1、linux的文件系统是采用级层式的树状目录结构&#xff0c;在此结构的最上层是根目录“/”,然后在此目录下创建其他目录 2、在Linux的世界里&#xff0c;一切皆文件&#xff01;&#xff01;&#xff01; 2、具体的目录结构 不用背&#xff0c;知道即可 2.1、…

价值1000元的稀有二开版的无限坐席在线客服系统源码+教程

demo软件园每日更新资源,请看到最后就能获取你想要的: 1.价值1000元的稀有二开版的无限坐席在线客服系统源码教程 价值1000元的稀有二开版的无限坐席在线客服系统源码 直接一键安装的&#xff0c;启动两个端口就行了&#xff0c;安装倒是简单 类型&#xff1a;在线客服系统 …

MathType7精简版数学公式编辑器

许多简单的数学公式&#xff0c;我们可以使用输入法一个个找到特殊符号并输入&#xff0c;但是对于高等数学中较多复杂的公式符号&#xff0c;是很难使用输入法完成的。那么&#xff0c;我们就需要借助公式编辑器&#xff0c;这里推荐一款我自己正在使用的MathType。 MathType是…

Redis单机数据库

文章目录 一、Redis数据库Redis数据库redisDb数据库键空间——dict过期字典——expires设置键的生存时间移除键的过期时间返回键的生存时间 Redis的过期删除策略1、定期删除2、惰性删除3、内存淘汰机制 过期键处理1、RDB功能对过期键的处理2、AOF功能对过期键的处理3、复制功能…