使用MASA Stack+.Net 从零开始搭建IoT平台 第五章 使用时序库存储上行数据

news2024/12/24 21:42:07

目录

  • 前言
  • 分析
  • 实施步骤
    • 时序库的安装
    • 解决playload没有时间戳问题
    • 代码编写
  • 总结


前言

我们可以将设备上行数据存储到关系型数据库中,我们需要两张带有时间戳的表(最新数据表历史数据表),历史数据表存储所有设备上报的数据,最新数据表需要存储设备最新一条上报数据,这条最新数据相当于设备的当前状态。然后展示的时候只展示最新一条数据的状态,报表查询可以按照设备id和时间从历史数据表查询汇总。
这样是可以的,但是我们的最新数据表需要被频繁的更新,数据量少的时候没问题。但数据量大,并发高的时候就会出现问题。
1、存储成本:数据不会被压缩,导致占用存储资源。
2、维护成本:单表数据量太大时,需要人工分库分表。
3、写入性能:单机写入吞吐量难以满足大量上行数据的写入需求,数据库存在性能瓶颈。
4、查询性能:数据量太大导致查询性能受到影响。

分析

我们可以采用时序库来解决上述问题,首先来了解一下什么是时序数据。时序数据是按照时间维度进行索引的数据,它记录了某个被测量实体在一定时间范围内,每个时间点上的一组测试值。传感器上传的室内PM2.5和甲醛数据、净水器传感器当前的TDS值、计算机系统的监控数据等,都属于时序数据,时序数据有如下特点:
1、数据量较大,写入操作是持续且平稳的,而且写多读少。
2、只有写入操作,几乎没有更新操作,比如去修改传感器的历史数据,是没有意义的。
3、没有随机删除,即使删除也是按照时间范围进行删除。删除某一个时间点的数据没有意义,但是删除2年前的数据是有意义的。
4、数据实时性和时效性强,数据随着时间的推移不断追加,旧数据很快失去意义。
5、大部分以时间和实体为维度进行查询,很少以测试值为维度查询,比如用户会查询某个时间段的温度数据,但是很少会去查询温度高于多少度的数据记录。
显然IoT的业务是符合使用时序库的场景的。
序数据库就是用来存储时序数据的数据库,时序数据库相较于传统的关系型数据和非关系型数据库而言,专门优化了对时序数据的存储,开源的时序数据库有InfluxDB OpenTSDB、TimeScaleDB 等。本文以InfluxDB数据库进行演示。
时序数据库有如下几个概念。
1.Metric:度量,相当于关系型数据库中的表(table)。
2.Data Point:数据点,相当于关系型数据库的中的行(row)。
3.Timestamp:时间戳,数据点生成时的时间戳。
4.Field:测量值,比如温度和湿度、PM2.5等。
5.Tag:标签,用于标识数据点,通常用来标识数据点的来源,比如温度和湿度数据来自哪个房间,哪个设备,可以当作关系型数据库表的主键。

如下图,度量为 Wind,每一个数据点都具有一个 timestamp,两个 field:direction 和 speed,两个 tag:sensor、city。它的第一行和第三行,存放的都是 sensor 号码为 95D8-7913 的设备,属性城市是上海。随着时间的变化,风向和风速都发生了改变,风向从 23.4 变成 23.2;而风速从 3.4 变成了 3.3。

图片来自网络

实施步骤

时序库的安装

安装参考官方文档,为了方便,我这里采用docker安装

docker run --name influxdb -p 8086:8086 influxdb:2.7.0

https://docs.influxdata.com/influxdb/v2.7/install/

我们打开 服务器ip:8086 可以看到它自带的管理界面,我们首先创建用户名密码,组织、以及Bucket的名称。
这里的bucket “IoTDemos” 相当于数据库的名称

我们记录一下这个Token,一会连接influxdb需要,相当于账号密码

解决playload没有时间戳问题

对于时序库来讲,时间戳是非常重要的,但是我们拿到的playload并没有时间戳(MQTTNet包我没有找到拿时间戳的方法)。
所以我们需要在mqtt上想办法,让设备上报数据的时候,mqtt自动添加时间戳到playload中。
1、我们在数据集成->规则中新建一条规则名称为"Add_Ts"。SQL编写如下

SELECT
  *,
  now_timestamp('millisecond') as payload.Ts
FROM
  "topic/#"

topic/# 代表消息发布到"topic/#"主题的事件
now_timestamp函数返回当前时间的 Unix 时间戳,我们将时间戳写入到payload的Ts属性中,关于更多内置SQL函数,请参考官方文档

https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-builtin-functions.html

2、我们打开下面的调试,模拟设备上报一条数据,可以看到这条规则帮我们加入了时间戳。

3、然后我们还需要处理添加了时间戳的处理结果,我们在右侧添加一个动作,选择消息重发布,将刚刚添加了时间戳的消息重发到一个新的Topic上,我们使用topic/dp,并在playload中添加${payload},这样我们就修改了playload中的信息,添加了我们需要的时间戳,当然,我们Hub订阅的消息也需要对应修改,添加/dp后缀。

4、首先我们先修改MASA.IoT.Hub的配置文件,Topic添加"/dp"后缀

  "MqttSetting": {
...
    "Topic": "$share/IotHub/topic/+/dp"
  },

5、CallbackAsync中,因为我们设备名称是从Topic截取的,也要对应修改一下。

    private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);

        Console.WriteLine(deviceDataPointStr);
        var pubSubOptions = new PubSubOptions
        {
            //修改一下获取设备名称的方式
            DeviceName = e.ApplicationMessage.Topic[6..^3],
            Msg = deviceDataPointStr,
            PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
            TrackId = Guid.NewGuid()
        };                            
...
    }

代码编写

解决完时间戳的问题,我们就可以编写代码向InfluxDB中写入数据了,我们首先在Infrastructure文件夹下创建ITimeSeriesDbClient接口和TimeSeriesDbClient类,使用接口也方便我们日后更换其他的时序库。
这里使用了InfluxDB.Client包。
ITimeSeriesDbClient.cs

namespace MASA.IoT.Core.Infrastructure
{
    public interface ITimeSeriesDbClient
    {
        bool WriteMeasurement<T>(T measurement);
    }
}

TimeSeriesDbClient.cs

using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using MASA.IoT.WebApi;
using Microsoft.Extensions.Options;

namespace MASA.IoT.Core.Infrastructure
{
    public class TimeSeriesDbClient : ITimeSeriesDbClient
    {
        private readonly InfluxDBClient _client;
        private readonly string _bucket;
        private readonly string _org;
        private readonly AppSettings _appSettings;
        
        public TimeSeriesDbClient(IOptions<AppSettings> settings)
        {
            _appSettings = settings.Value;
            _org = _appSettings.InfluxDBSetting.Org;
            _bucket = _appSettings.InfluxDBSetting.Bucket;
            _client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token);
        }

        public bool WriteMeasurement<T>(T measurement)
        {
            try
            {
                using var writeApi = _client.GetWriteApi();
                writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org);
                return true;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                return false;
            }
        }
    }
}

这里使用new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)来构造InfluxDBClient。
Token就是我们创建Bucket过程中保存的Token
Url是我们InfluxDB的访问地址:http://127.0.0.1:8086
写入的方法WriteMeasurement中我们通过**_client.GetWriteApi创建一个写入的api然后直接将我们要写入的泛型实体写入,第二个可选参数代表写入精度,这里我们使用WritePrecision.Ms**
我们在DeviceHandler.cs中注入ITimeSeriesDbClient 并添加一个WriteMeasurementAsync方法,在方法中我们先根据设备名称获取产品,如果识别产品ID为10001(空净产品),
那么我们就写入数据到Measurement:AirPurifierDataPoint
Measurement相当于数据库的表。
MeasurementColumn特性都是InfluxDB.Client.Core提供的,可以用来标识TagTimestamp

using InfluxDB.Client.Core;
using Newtonsoft.Json;

namespace MASA.IoT.Core.Contract
{
    [Measurement("AirPurifierDataPoint")]
    public class AirPurifierDataPoint
    {
        /// <summary>
        /// 设备名称
        /// </summary>
        [Column("DeviceName", IsTag = true)] public string DeviceName { get; set; }

        /// <summary>
        /// 产品ID
        /// </summary>
        [Column("ProductId", IsTag = true)] public Guid ProductId { get; set; }

        /// <summary>
        /// Pm2.5
        /// </summary>
        [Column("PM_25")] public double? Pm_25 { get; set; }
        /// <summary>
        /// 温度
        /// </summary>
        [Column("Temperature")] public double? Temperature { get; set; }
        /// <summary>
        /// 湿度
        /// </summary>
        [Column("Humidity")] public double? Humidity { get; set; }
        /// <summary>
        /// 时间戳
        /// </summary>
        [JsonProperty(propertyName: "Ts")]
        [Column(IsTimestamp = true)] public long Timestamp { get; set; }
    }
}

    public class DeviceHandler : IDeviceHandler
    {
        private readonly MASAIoTContext _ioTDbContext;
        private readonly IMqttHandler _mqttHandler;
        private readonly ITimeSeriesDbClient _timeSeriesDbClient;

        public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient)
        {
            _ioTDbContext = ioTDbContext;
            _mqttHandler = mqttHandler;
            _timeSeriesDbClient = timeSeriesDbClient;
        }

        /// <summary>
        /// 写入数据
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="pubSubOptions"></param>
        /// <returns></returns>
        public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions)
        {
            var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking()
                .FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName);

            if (device != null && device.ProductInfo.ProductCode == "10001")  //空气净化器产品
            {
                var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg);

                airPurifierDataPoint.ProductId = device.ProductInfoId;
  
                return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint);

            }
            return false;
        }

除了WriteMeasurement方法之外,还提供了很多其他方法,如WritePoint,和批量写入的方法,可自行测试。
#测试
我们启动项目,通过MQTTX向**“topic/284202304230001”**上报一条数据

{
  "DeviceName":"284202304230001",
  "Pm_25":100,
  "Temperature":25,
  "Humidity":50
}

我们在influxDB的管理工具中使用Data Explorer,使用如下的flux query查询语句,即可查出5分钟之内的数据,注意,这里的时间是UTC时间

如果想显示北京时区方便调试,可以在后面添加**|> timeShift(duration: 8h)**

from(bucket: "IoTDemos") 
|> range(start:-5m)


关于flux查询语法

https://docs.influxdata.com/flux/v0.x/

总结

本节我们简单介绍了开源时序数据库influxDB的安装。
我们借助InfluxDB.Client库完成设备从上报到时序库数据存储的全过程,下一节我们介绍从时序库查询数据。

完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/686866.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

iptables详解

iptables简介 netfilter/iptables&#xff08;简称为iptables&#xff09;组成Linux平台下的包过滤防火墙&#xff0c;完成封包过滤、封包重定向和网络地址转换&#xff08;NAT&#xff09;等功能。 iptables 规则&#xff08;rules&#xff09;其实就是网络管理员预定义的条…

神通数据库X86架构适配DJANGO317指南

制作神通数据库镜像 1&#xff09;、下载docker.io/centos:7.9.2009镜像&#xff0c;docker pull docker.io/centos:7.9.2009 2)、运行一个容器&#xff0c;docker run -itd --name shentong -p 2003:2003 --privilegedtrue --restartalways -v /sys/fs/cgroup:/sys/fs/cgrou…

万字详解JavaScript手写一个Promise

目录 前言Promise核心原理实现 Promise的使用分析MyPromise的实现在Promise中加入异步操作 实现then方法的多次调用 实现then的链式调用 then方法链式调用识别Promise对象自返回 捕获错误及 then 链式调用其他状态代码补充 捕获执行器错误捕获then中的报错错误与异步状态的链式…

硬盘设备出现“设备硬件出现致命错误,导致请求失败”怎么办?

当我们尝试访问或打开计算机上的硬盘设备&#xff0c;有时候会出现“设备硬件出现致命错误&#xff0c;导致请求失败”的错误提示&#xff0c;这该怎么办呢&#xff1f;下面我们就来了解一下。 出现“设备硬件出现致命错误&#xff0c;导致请求失败”错误的原因有哪些&#xff…

机器学习之SVM支持向量机

目录 经典SVM 软间隔SVM 核SVM SVM分类器应用于人脸识别 SVM优点 SVM缺点 经典SVM 支持向量机&#xff08;Support Vector Machine&#xff0c;SVM&#xff09;是一种二分类模型&#xff0c;其基本思想是在特征空间中找到一个最优的超平面&#xff0c;使得正负样本点到…

数据结构 队列(C语言实现)

绪论 任其事必图其效&#xff1b;欲责其效&#xff0c;必尽其方。——欧阳修&#xff1b;本篇文章主要写的是什么是队列、以及队列是由什么组成的和这些组成接口的代码实现过程。&#xff08;大多细节的实现过程以注释的方式展示请注意查看&#xff09; 话不多说安全带系好&…

Python3,关于请求重试,这次requests库给安排的明明白白。

requests库重试请求 1、引言2、requests库2.1 安装2.2 代码实例2.2.1 重试次数设置2.2.2 重试条件设置2.2.3 超时时间设置 3、总结 1、引言 小屌丝&#xff1a;鱼哥&#xff0c; 你看这是啥&#xff1f; 小鱼&#xff1a;我瞅瞅… 小屌丝&#xff1a;鱼哥&#xff0c;你这眼神…

【计算机视觉】Fast Segment Anything 安装步骤和示例代码解读(含源代码)

文章目录 一、导读二、安装步骤2.1 将存储库克隆到本地2.2 创建 conda 环境2.3 安装软件包2.4 安装 CLIP2.5 下载权重文件2.6 开始使用2.6.1 Everything mode2.6.2 Text prompt2.6.3 Box prompt (xywh)2.6.4 Points prompt 三、示例代码 一、导读 论文地址&#xff1a; https:…

服务器配置与操作

服务器配置与操作 一、连接远程服务器 推荐用xshell 或者 finalshell 或者 winSCP 或者 FileZilla xshell下载地址&#xff1a;https://xshell.en.softonic.com/ 二、服务器配置 2.1 安装JDK 2.1 方法一&#xff1a;在线安装 yum list java* yum -y install java-1.8.0-ope…

【Django | 爬虫 】收集某吧评论集成舆情监控(附源码)

&#x1f935;‍♂️ 个人主页: 计算机魔术师 &#x1f468;‍&#x1f4bb; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 文章目录 一、爬取帖子、二级评论二、构建数据表三、并入项目1. spider代码2. view视图代码3. 优化后台界面3. urls路由 四、定…

第二十三章Java二维数组详解

一、创建二维数组 在 Java 中二维数组被看作数组的数组&#xff0c;即二维数组为一个特殊的一维数组&#xff0c;其每个元素又是一个一维数组。Java 并不直接支持二维数组&#xff0c;但是允许定义数组元素是一维数组的一维数组&#xff0c;以达到同样的效果。声明二维数组的语…

编程规范-控制流程、错误和异常处理

前言&#xff1a; \textcolor{Green}{前言&#xff1a;} 前言&#xff1a; &#x1f49e;这个专栏就专门来记录一下寒假参加的第五期字节跳动训练营 &#x1f49e;从这个专栏里面可以迅速获得Go的知识 今天的笔记是对编程规范的补充&#xff0c;对控制流程、错误和异常处理进行…

Ansys Zemax | 内窥镜物镜系统初始结构的优化提升(下)

系统性能提升 根据上篇的内窥镜系统分析&#xff0c;我们可以从四个方面对内窥镜物镜系统进行优化&#xff1a;元件间距、圆锥系数、MTF 值以及畸变值。点击优化-评价函数编辑器以设置具体的评价函数。&#xff08;联系我们获取文章附件&#xff09; 首先&#xff0c;用三个 CO…

NXP i.MX 8M Plus工业开发板硬件说明书--下册( 四核ARM Cortex-A53 + 单核ARM Cortex-M7,主频1.6GHz)

前 言 本文档主要介绍创龙科技TLIMX8MP-EVM评估板硬件接口资源以及设计注意事项等内容。 创龙科技TLIMX8MP-EVM是一款基于NXP i.MX 8M Plus的四核ARM Cortex-A53 单核ARM Cortex-M7异构多核处理器设计的高性能工业评估板&#xff0c;由核心板和评估底板组成。ARM Cortex-A5…

【AndroidUI设计】Bottom Navigation Activity中Fragment(碎片)的添加和下层导航图标的修改

文章目录 一、引言二、设计1、添加Fragment&#xff08;1&#xff09;确认需求&#xff08;2&#xff09;创建 <1> 方法一&#xff1a;借助工具快速生成 <2> 方法二&#xff1a;视图&#xff08;图层&#xff09;工具 <3> 方法三&#xff1a;手动…

知网G4《语数外学习》简介及投稿邮箱

知网G4教育专刊《语数外学习》简介及投稿邮箱 《语数外学习》全新改版&#xff0c;分别针对初中三个不同年级&#xff0c;每本仍然兼顾语数外三个学科。改版后的《语数外学习》将密切关注课改和中考改革的进程&#xff0c;与教材同步&#xff0c;在帮中学生朋友释疑疑惑、提高…

DOTA-PEG3-azide,1428146-79-5,DOTA三聚乙二醇叠氮,试剂相关研究说明

DOTA-PEG3-azide&#xff0c;DOTA PEG3 N3&#xff0c;DOTA三聚乙二醇叠氮产品结构式&#xff1a; 产品规格&#xff1a; 1.CAS号&#xff1a;1428146-79-5 2.分子式&#xff1a;C24H44N8O10 3.分子量&#xff1a;604.66 4.包装规格&#xff1a;白色固体 &#xff0c;1g、5g、1…

数据库性能测试

目录 前言&#xff1a; 1.引入数据库驱动包 2.添加数据库配置元件 3、JDBCRequest参数化 4、Variablesnames参数使用方法&#xff1a; 前言&#xff1a; 数据库性能测试是测试数据库系统在各种条件下的性能和稳定性的过程。它可以帮助测试人员识别数据库系统的性能瓶颈&a…

30余名「实在RPA·数字员工」在纳爱斯诞生,在618中服务千万消费者!

积水成渊&#xff0c;聚沙成塔&#xff01;谁在世界数字化大势中不断变革自己&#xff1f; 长期蝉联“中国品牌价值评价”日化行业首位&#xff0c;问鼎中国工业“奥斯卡”大奖的“大国品牌”纳爱斯——当仁不让&#xff01; 纳爱斯是日化行业领军企业&#xff0c;业务覆盖家…

SpringBoot整合MybatisPlus 自动生成controller、mapper、entity、service

首先创建SpringBoot项目 选择依赖 把application的后缀改为.yml&#xff0c;方便些。 pom.xml&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w…