TDengine 集成 EMQX 通过规则引擎实现设备数据直接入库

news2024/11/26 21:25:11

背景

曾使用过 IoTDB 自带的 MQTT Broker 实现了设备数据入库,那么使用 TDengine 时,我们可以借助 EMQX (一款优秀的国产开源 MQTT Broker )的规则引擎结合 TDengine 的 RESTful API 完成设备数据的路由与入库。

  • 用到的工具
  1. TDengine RESTful API
  2. EMQX 规则引擎
  3. TDengine GUI图形化管理工具
  4. Node.js下的MQTT客户端
  5. 虚拟机CentOS操作系统
  • 版本信息
  1. TDengine: 2.2.0.0
  2. EMQX: 4.2.4
  3. Node.js: 12.16.1
  4. CentOS: 7

TDengine创建数据库表

create database if not exists ok;

create stable if not exists ok.power(ts timestamp, voltage int, current float, temperature float) tags(sn int, city nchar(64), groupid int);

create table if not exists ok.device1 using ok.power tags(1, "太原", 1);
create table if not exists ok.device2 using ok.power tags(2, "西安", 2);

insert into ok.device1 values("2021-09-04 21:03:38.734", 1, 1.0, 1.0);
insert into ok.device2 values("2021-09-04 21:03:40.734", 2, 2.0, 2.0);

初始数据如下:

2021-09-23-InitialData.jpg

EMQX创建资源

所谓的资源就是将要连接的数据库、中间件等,这里便是 TDengine 的连接,通过其 RESTful API 建立连接,在规则引擎的动作响应中会用到这里的资源。

2021-09-23-TDengineResource.jpg

2021-09-23-ResourceView.jpg

其中关于头信息中的 Authorization 通过以下方式获得。

# 获取token
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl hadoop1:6041/rest/login/root/taosdata
{"status":"succ","code":0,"desc":"/KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04"}

# 测试:附加自定义token在头信息,正常响应
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'select * from ok.power' hadoop1:6041/rest/sql
{"status":"succ","head":["ts","voltage","current","temperature","sn","city","groupid"],"column_meta":[["ts",9,8],["voltage",4,4],["current",6,4],["temperature",6,4],["sn",4,4],["city",10,64],["groupid",4,4]],"data":[["2021-09-04 21:03:38.734",1,1.00000,1.00000,1,"太原",1],["2021-09-04 21:03:40.734",2,2.00000,2.00000,2,"西安",2]],"rows":2}

EMQX创建规则

  • 创建规则:这里直接从主题device/sn中获取payload,结果命名为power

2021-09-23-RuleContent.jpg

  • 测试规则:模拟一条数据,经过测试,定义的规则成功命中。

2021-09-23-RuleTest.jpg

EMQX创建动作响应

当命中数据后,我们的目标是将其存入数据库,那么我们一开始定义的 TDengine 资源就派上用场了。

  1. Action选择Data to Web Server表示我们要将数据发送至Web服务(即 TDengine 的 RESTful API
  2. Resource选择我们创建好的资源
  3. 最后填写Payload Template,写入数据表的SQL语句,这里支持插值:insert into ok.device${power.sn} values ('${power.ts}', ${power.voltage}, ${power.currente}, ${power.temperature})

2021-09-23-Action.jpg

Node.js模拟MQTT客户端

这里通过 Node.js 模拟一个设备,向主题 device/sn 随机发布数据,完成数据上报,当然也可以借助其他客户端来实现。

2021-09-23-NodeClient.jpg

EMQX查看规则引擎Metrics

点击 Rule 菜单下的规则引擎 ID ,可查看已配置的规则详情,还可以看到多少消息被规则命中的度量信息(需刷新页面)。

2021-09-23-Metrics.jpg

TDengine客户端查看数据

数据库中确认写入两条新数据:

2021-09-23-FinalData.jpg

规则引擎扩展

开源版的 EMQX Broker 除了全面支持 MQTT5 新特性、多协议支持外,更强大的地方在于其围绕 MQTT 周边提供了一系列的 WebHook 、 HTTP API 接口以及最为核心的规则引擎。上面我们只是通过主题选择了数据进行规则匹配,其实规则引擎还可以结合一系列的内部事件,编写规则时以$开头,包括客户端连接事件、断开事件、消息确认事件、消息发布事件、订阅事件、取消订阅事件等。

2021-09-23-RuleAdvanced.jpg
EMQX Broker 一开始的定位就是物联网消息中间件,目前开源版本功能已经非常强大,而企业版本与Cloud版本更是提供了高阶功能,全托管、更稳定、更可靠,技术支持更及时。以下是我试用的Cloud版本。

2021-09-23-Cloud.jpg

可能遇到的问题

  • 端口开放问题
    因为通过宿主机访问虚拟机,所以记得关闭防火墙或者开放对应的端口,这里涉及到的端口有:
  1. 6041:TDengine的RESTful API默认端口
  2. 1883:EMQX的MQTT默认端口
  3. 18083:EMQX的Dashboard默认端口
# 关闭防火墙
[root@hadoop1 ~]# systemctl stop firewalld.service

# 放行端口
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 6041 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 1883 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 18083 -j ACCEPT
  • 主题名称不匹配导致规则无法命中

作为约定俗成的实践,一般在编码时 MQTT 的主题不以 / 开头,即写作 device/sn ,而不是 /device/sn 。

刚开始我在 MQTT 客户端发送数据时主题名为 /device/sn ,而规则引擎中的主题为 device/sn ,导致无法匹配。

  • SQL模板中的字符串

这里的ts以字符串形式发送,因此需要将插值用引号括起来:‘${power.ts}’。否则 TDengine 日志报错:

[root@hadoop1 taos]# tailf ./log/taosdlog.0
09/23 08:42:11.707621 00001702 TSC ERROR 0x8e async result callback, code:Syntax error in SQL
09/23 08:42:11.707675 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, user:root, query error, code:Syntax error in SQL, sqlObj:0x7f5f74000c10
09/23 08:42:11.725687 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, code:400, error:Syntax error in SQL
  • 规则引擎的Metrics计数与实际发送数据不符

这与客户端发送数据指定的 QoS 相关,如果 QoS = 1 ,则MQTT协议的重发机制可能导致数据重复发送。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/585608.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能python:Python下载Module的指南

Python下载Module的指南 作为一门高级编程语言,Python凭借其简单易学、高效便捷的特点,越来越受到广大程序员的喜爱。Python社区也逐渐发展壮大,丰富的第三方Module为我们提供了更多功能强大、用途广泛的工具。本篇文章将介绍Python下载Modu…

从汇编代码的角度去理解C++多线程编程问题

目录 1、多线程问题实例 2、理解该多线程问题的预备知识 2.1、二进制机器码和汇编代码 2.2、多线程切换与CPU时间片 2.3、多线程创建与线程函数 3、从汇编代码的角度去理解多线程问题 4、问题解决办法 5、熟悉汇编代码有哪些用处? 5.1、在代码中插入汇编代…

信号处理与分析-傅里叶

目录 一、引言 二、傅里叶级数 1. 傅里叶级数的定义 2. 傅里叶级数的性质 三、傅里叶变换 1. 傅里叶变换的定义 2. 傅里叶变换的性质 四、离散傅里叶变换 1. 离散傅里叶变换的定义 2. 离散傅里叶变换的性质 五、应用实例 1. 信号处理 2. 图像处理 六、总结 一、引…

Revit中窗族的默认窗台高度与底高度是一样?

​  一、窗族的默认窗台高度与底高度是一样的吗? 窗族的系统设定中有一个自带的参数就是默认窗台高度,指的是窗户放置的时候窗户最底端离墙的最底端高度。 当我们创建一个建筑样板将我们创建好的窗族放置好的时候,这个参数就在窗的类型属性中&#xf…

2023年上半年 软件设计师答案解析

前言:2023年上半年软考已经落幕了,学长整理了一下软件设计师的题目以及个人理解的答案(仅供参考)希望能够帮助参加软考的各个小伙伴能够清晰的估分,希望大家都能通过考试~ 目录 2023年上半年 软件设计师 上午试卷 2023…

C Primer Plus第十二章编程练习答案

学完C语言之后,我就去阅读《C Primer Plus》这本经典的C语言书籍,对每一章的编程练习题都做了相关的解答,仅仅代表着我个人的解答思路,如有错误,请各位大佬帮忙点出! 1.不使用全局变量,重写程序…

网络连接中的舔狗协议

舔狗网络协议 (discard protocol) 最近互联网上,“舔狗” 这个词语很火,也衍生出来很多梗(快速说出互联网 4 大舔狗!!!)。然后今天偶然间看到了一个 RFC 文档, 发现了一…

用户需求分析工具:Y模型

用户需求分析工具:Y模型 《人人都是产品经理》作者苏杰提出 阿里巴巴产品经理多年 趣讲大白话:有个框框好同频 【趣讲信息科技180期】 **************************** 很多交流就是鸡同鸭讲 沟通的背景、动机、目的、方式、高度等严重不同 如果有一个模型…

服务器端安装jupyter notebook并在本地使用与环境配置一条龙服务【服务器上跑ipynb】

linux服务器端安装jupyter notebook并在本地使用 1.生成配置文件:2.配置Jupyter notebook密码3,修改配置文件~/.jupyter/jupyter_notebook_config.py4. 本地访问远端的服务器的jupyter1.首先在Linux服务器上启动Jupyter notebook2.然后在本地转发端口 为jupyter notebook配置co…

【云原生|探索 Kubernetes 系列 6】从 0 到 1,轻松搭建完整的 Kubernetes 集群

前言 大家好,我是秋意零。 前面一篇中,我们介绍了 kubeadm 的工作流程。那么今天我们就实际操作一下,探索如何快速、高效地从 0 开始搭建一个完整的 Kubernetes 集群,让你轻松驾驭容器化技术的力量!! &am…

json和pickle模块

目录 ❤ json和pickle模块 序列化 json pickle python从小白到总裁完整教程目录:https://blog.csdn.net/weixin_67859959/article/details/129328397?spm1001.2014.3001.5502 ❤ json和pickle模块 序列化 把对象(变量)从内存中变成可存储或传输的过程称之为序列化&am…

3D EXPERIENCE“热知识” | 如何使用3D EXPERIENCE平台上的问题管理?

3D EXPERIENCE 平台上的问题管理对任何组织都是有用的工具,无论其规模大小。无论是使用它来标记和分发PDF还是在车间和工程部门之间分享想法,问题管理都可以简化日常活动。简而言之,它会根据权限列出现有问题,并让用户创建新问题&…

软件测试----软件测试四大测试过程

1、测试分析 (1)要点 1)软件需求分析 2)测试需求项的提取 3)用户使用场景分析 4)测试工具的调研和选取 5)测试缺陷分析 (2)分工 1)测试人员:提…

09:mysql---事务

目录 1:事务简介 2:事务操作 3:事务四大特性 4:并发事务问题 5:事务隔离级别 1:事务简介 事务 是一组操作的集合,它是一个不可分割的工作单位,事务会把所有的操作作为一个整体一起向系统提交或撤销操作请求,即这些操作要么同时成功&…

周赛347(模拟、思维题、动态规划+优化)

文章目录 周赛347[2710. 移除字符串中的尾随零](https://leetcode.cn/problems/remove-trailing-zeros-from-a-string/)模拟 [2711. 对角线上不同值的数量差](https://leetcode.cn/problems/difference-of-number-of-distinct-values-on-diagonals/)模拟 [2712. 使所有字符相等…

索引下推(Index Condition Pushdown)

使用一张用户表t_user,表里创建联合索引(name, age)。 如果现在有一个需求:检索出表中名字第一个字是张,而且年龄是10岁的所有用户。那么,SQL语句是这么写的: 复制代码 select * from tuser w…

【教学类-35-01】带笔画步骤图的描字(姓氏)(A4整张)

作品展示: 1、图片一行(0-6):文字简单,写3*412个字 2、图片2行(6-12):笔画适中,写3*39个字 3、图片3行(12-18):笔画适中,…

LeetCode刷题(ACM模式)-03哈希表

参考引用:代码随想录 注:每道 LeetCode 题目都使用 ACM 代码模式,可直接在本地运行,蓝色字体为题目超链接 0. 哈希表理论基础 0.1 哈希表 哈希表(Hash table,也称散列表)是根据关键码的值而直…

Redis(六)主从模式与哨兵机制

文章目录 一、主从模式配置一主二从集群 二、哨兵机制哨兵模式演示:哨兵如何监控节点「主观下线」与[客观下线]哨兵如何选新主节点由哪个哨兵进行转移如何通知客户端新主节点的信息? 一、主从模式 配置一主二从集群 开启三个linux,并安装redis info …

【k8s】【Prometheus】【待写】

环境 k8s v1.18.0 192.168.79.31 master 192.168.79.32 node-1 192.168.79.33 node-2一、Prometheus 对 kubernetes 的监控 1.1 node-exporter 组件安装和配置 node-exporter 可以采集机器(物理机、虚拟机、云主机等)的监控指标数据,能够采…