Ubuntu TDengine 集成 EMQX 通过规则引擎实现设备数据直接入库

news2025/2/22 14:33:22

背景

曾使用过 IoTDB 自带的 MQTT Broker 实现了设备数据入库,那么使用 TDengine 时,我们可以借助 EMQX (一款优秀的国产开源 MQTT Broker )的规则引擎结合 TDengine 的 RESTful API 完成设备数据的路由与入库。

  • 用到的工具
  1. TDengine RESTful API
  2. EMQX 规则引擎
  3. TDengine GUI图形化管理工具
  4. Node.js下的MQTT客户端
  5. 虚拟机CentOS操作系统
  • 版本信息
  1. TDengine: 2.2.0.0
  2. EMQX: 4.2.4
  3. Node.js: 12.16.1
  4. CentOS: 7

TDengine创建数据库表

create database if not exists ok;

create stable if not exists ok.power(ts timestamp, voltage int, current float, temperature float) tags(sn int, city nchar(64), groupid int);

create table if not exists ok.device1 using ok.power tags(1, "太原", 1);
create table if not exists ok.device2 using ok.power tags(2, "西安", 2);

insert into ok.device1 values("2021-09-04 21:03:38.734", 1, 1.0, 1.0);
insert into ok.device2 values("2021-09-04 21:03:40.734", 2, 2.0, 2.0);

初始数据如下:

2021-09-23-InitialData.jpg

EMQX创建资源

所谓的资源就是将要连接的数据库、中间件等,这里便是 TDengine 的连接,通过其 RESTful API 建立连接,在规则引擎的动作响应中会用到这里的资源。

2021-09-23-TDengineResource.jpg

2021-09-23-ResourceView.jpg

其中关于头信息中的 Authorization 通过以下方式获得。

# 获取token
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl hadoop1:6041/rest/login/root/taosdata
{"status":"succ","code":0,"desc":"/KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04"}

# 测试:附加自定义token在头信息,正常响应
cxzx-t580@Heartsuit MINGW64 /d/IoT
$ curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'select * from ok.power' hadoop1:6041/rest/sql
{"status":"succ","head":["ts","voltage","current","temperature","sn","city","groupid"],"column_meta":[["ts",9,8],["voltage",4,4],["current",6,4],["temperature",6,4],["sn",4,4],["city",10,64],["groupid",4,4]],"data":[["2021-09-04 21:03:38.734",1,1.00000,1.00000,1,"太原",1],["2021-09-04 21:03:40.734",2,2.00000,2.00000,2,"西安",2]],"rows":2}

EMQX创建规则

  • 创建规则:这里直接从主题device/sn中获取payload,结果命名为power

2021-09-23-RuleContent.jpg

  • 测试规则:模拟一条数据,经过测试,定义的规则成功命中。

2021-09-23-RuleTest.jpg

EMQX创建动作响应

当命中数据后,我们的目标是将其存入数据库,那么我们一开始定义的 TDengine 资源就派上用场了。

  1. Action选择Data to Web Server表示我们要将数据发送至Web服务(即 TDengine 的 RESTful API
  2. Resource选择我们创建好的资源
  3. 最后填写Payload Template,写入数据表的SQL语句,这里支持插值:insert into ok.device${power.sn} values ('${power.ts}', ${power.voltage}, ${power.currente}, ${power.temperature})

2021-09-23-Action.jpg

Node.js模拟MQTT客户端

这里通过 Node.js 模拟一个设备,向主题 device/sn 随机发布数据,完成数据上报,当然也可以借助其他客户端来实现。

2021-09-23-NodeClient.jpg

EMQX查看规则引擎Metrics

点击 Rule 菜单下的规则引擎 ID ,可查看已配置的规则详情,还可以看到多少消息被规则命中的度量信息(需刷新页面)。

2021-09-23-Metrics.jpg

TDengine客户端查看数据

数据库中确认写入两条新数据:

2021-09-23-FinalData.jpg

规则引擎扩展

开源版的 EMQX Broker 除了全面支持 MQTT5 新特性、多协议支持外,更强大的地方在于其围绕 MQTT 周边提供了一系列的 WebHook 、 HTTP API 接口以及最为核心的规则引擎。上面我们只是通过主题选择了数据进行规则匹配,其实规则引擎还可以结合一系列的内部事件,编写规则时以$开头,包括客户端连接事件、断开事件、消息确认事件、消息发布事件、订阅事件、取消订阅事件等。

2021-09-23-RuleAdvanced.jpg
EMQX Broker 一开始的定位就是物联网消息中间件,目前开源版本功能已经非常强大,而企业版本与Cloud版本更是提供了高阶功能,全托管、更稳定、更可靠,技术支持更及时。以下是我试用的Cloud版本。

2021-09-23-Cloud.jpg

可能遇到的问题

  • 端口开放问题
    因为通过宿主机访问虚拟机,所以记得关闭防火墙或者开放对应的端口,这里涉及到的端口有:
  1. 6041:TDengine的RESTful API默认端口
  2. 1883:EMQX的MQTT默认端口
  3. 18083:EMQX的Dashboard默认端口
# 关闭防火墙
[root@hadoop1 ~]# systemctl stop firewalld.service

# 放行端口
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 6041 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 1883 -j ACCEPT
[root@hadoop1 ~]# iptables -I INPUT -p TCP --dport 18083 -j ACCEPT
  • 主题名称不匹配导致规则无法命中

作为约定俗成的实践,一般在编码时 MQTT 的主题不以 / 开头,即写作 device/sn ,而不是 /device/sn 。

刚开始我在 MQTT 客户端发送数据时主题名为 /device/sn ,而规则引擎中的主题为 device/sn ,导致无法匹配。

  • SQL模板中的字符串

这里的ts以字符串形式发送,因此需要将插值用引号括起来:‘${power.ts}’。否则 TDengine 日志报错:

[root@hadoop1 taos]# tailf ./log/taosdlog.0
09/23 08:42:11.707621 00001702 TSC ERROR 0x8e async result callback, code:Syntax error in SQL
09/23 08:42:11.707675 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, user:root, query error, code:Syntax error in SQL, sqlObj:0x7f5f74000c10
09/23 08:42:11.725687 00001696 HTP ERROR context:0x7f5f880008c0, fd:30, code:400, error:Syntax error in SQL
  • 规则引擎的Metrics计数与实际发送数据不符

这与客户端发送数据指定的 QoS 相关,如果 QoS = 1 ,则MQTT协议的重发机制可能导致数据重复发送。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/566841.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PAI-Diffusion中文模型全面升级,海量高清艺术大图一键生成

作者:段忠杰(终劫)、刘冰雁(伍拾)、汪诚愚(熊兮)、黄俊(临在) 背景 以Stable Diffusion模型为代表,AI生成内容(AI Generated Content&#xff…

鸿蒙Hi3861问题解决-[OHOS ERROR] clang not found, install it please

一、简介 在使用DevEco进行编译时出现[OHOS ERROR] clang not found, install it please问题,导致编译失败,这里做个问题记录。 二、解决 这种问题其实还是工具链安装不全造成的。 安装gn 这里用的是VSCode DevEco组件,里边包含了gn组件的安…

error while loading shared libraries: libc.so.6 误删除libc.so.6急救办法,

故障原因: 在高版本的环境中编译了一个软件,然而在低版本系统中无法使用,缺少libc.so支持,然而在编译过程中误删除了 libc.so.6的软连接,rm /lib64/libc.so.6 删除后发现系统好多命令都无法使用了,悲催&#xff01…

国际最高级别认可!赛宁网安荣获CMMI5国际认证

​​近日,经国际权威机构评估,南京赛宁信息技术有限公司(简称:赛宁网安)顺利通过国际软件领域最高级别的CMMI五级(简称CMMI5)认证。荣获CMMI5证书,标志着赛宁网安在软件研发、软件成…

Axure教程—动态单散点图(中继器)

本文将教大家如何用AXURE制作单散点图 一、效果介绍 如图: 预览地址:https://q79has.axshare.com 下载地址:https://download.csdn.net/download/weixin_43516258/87817717 二、功能介绍 简单填写中继器内容即可生成动态单散点图样式颜色等…

前端微服务无界实践 | 京东云技术团队

一、前言 随着项目的发展,前端SPA应用的规模不断加大、业务代码耦合、编译慢,导致日常的维护难度日益增加。同时前端技术的发展迅猛,导致功能扩展吃力,重构成本高,稳定性低。因此前端微服务应运而生。 前端微服务优势…

【源码篇】基于SSM+EasyUI开发的学生后台管理系统

系统介绍 一个基于SSM的学生管理系统:代码注释详细,逻辑结构清晰,对于初学 SSM 的同学非常具有参考,及学习价值哟! 数据库中默认的管理员身份信息 账户名:admin,密码:admin 用户权限介绍: 管理员:具有所有管理模块的操控权限。学生:仅具有学生信息管理模块的查…

智能AI抢了元宇宙的风头?

前几天,微博突然出现这么一条热搜。 #ChatGPT官方APP登录美国苹果应用商店 这绝对是一条相当火爆的新闻,因为这意味着智能聊天机器人ChatGPT终于有自己的App了。值得一提的是,ChatGPT不仅仅登录了美国苹果应用商店,而且下载量迅…

80个10倍提升Excel技能的ChatGPT提示

你是否厌倦了在使用Excel时感觉像个新手?你是否想将你的技能提升到更高的水平,成为真正的Excel大师?嗯,如果你正在使用ChatGPT,那么成为Excel专家简直易如反掌。 你只需要了解一些最有用的Excel提示,就能在…

打造高效互联网医院系统源码:解读其核心功能及应用

随着互联网的不断普及和发展,互联网医院系统已经成为了现代医疗服务的一个重要组成部分。本文将介绍互联网医院系统的核心功能以及其应用,并提供一些互联网医院系统开发代码。 互联网医院系统是一种基于互联网技术的医疗服务平台,可以通过网…

Springboot +spring security,实现session并发控制及实现原理分析

一.简介 在SpringSecurity中实现会话并发控制,只需要配置一个会话数量就可以了,先介绍下如何配置会话并发控制,然后再。介绍下SpringSecurity 如何实现会话并发控制。 二.创建项目 如何创建一个SpringSecurity项目,前面文章已经…

Hive ---- 函数

Hive ---- 函数 1. 函数简介2. 单行函数1. 算术运算函数2. 数值函数3. 字符串函数4. 日期函数5. 流程控制函数6. 集合函数7. 案例演示 3. 高级聚合函数案例演示 4. 炸裂函数1. 概述2. 案例演示 5. 窗口函数1. 概述2. 常用窗口函数3. 案例演示 6. 自定义函数7. 自定义UDF函数 1.…

Unity - 记一次非正规变体优化带来的兼容性导致部分手机卡死的问题

文章目录 问题但是我咨询过 公司中台TA大佬 - 2023.4.6然后咨询 unity 技术官方 - 2023.4.6再次遇到卡死 - 2023.5.24 解决方法具体华为真机上的 DEBUG 问题 在 2023.4.6 我们的 角色展示界面 就遇到了 华为手机,red mi note 11 的测试手机上的 后 2023.5.24 再次遇…

SSM框架学习之spring

Spring 以下是关于Spring Boot学习的一些文档和资源,希望对你有帮助: Spring Boot官方文档:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/ Spring Boot中文文档:https://www.springcloud.cc/spring-bo…

Server - 高性能的 PyTorch 训练环境配置 (PyTorch3D 和 FairScale)

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://blog.csdn.net/caroline_wendy/article/details/130863537 PyTorch3D 是基于 PyTorch 的 3D 数据深度学习库,提供了高效、模块化和可微分的组件,以简化 3D 深度学…

龙讯旷腾作为首批单位入驻北京昇腾人工智能计算中心

2023中关村论坛系列活动—北京人工智能产业创新发展大会圆满落幕,围绕北京AI产业发展,政产学研用各界大咖汇聚京城,中国科协副主席束为、北京市副市长于英杰、中国工程院院士廖湘科出席大会。会上,北京市门头沟区政府联合中关村发…

Kubernetes基础操作

K8S基础操作 ✨✨✨✨✨✨✨✨✨这个基础操作一切都基于各位把k8s搭建好哦,搭建的时候请一定一定一定(很重要),选定一个版本,能避免很多错,然后本章节就给大家介绍了k8s最基础的操作,有一些复杂…

基于GPTP时间同步(时钟同步服务器)技术助力智能驾驶应用

基于GPTP时间同步(时钟同步服务器)技术助力智能驾驶应用 基于GPTP时间同步(时钟同步服务器)技术助力智能驾驶应用 智能驾驶区域网关架构并未采用车载以太网总线进行连接,而是采用传统的 CAN 总线、FlexRay 或 MOST 总线…

解决若依出现Error: Cannot find module ‘@/views/xxx‘问题

问题描述: 若依 vue 版菜单点不开,报错:Error: Cannot find module ‘/views/xxx’ 。后台、vue前端启动都没问题。但是左侧菜单点不开,一直在加载中。 原因: 路由懒加载,webpack版本问题,we…

常见淘宝API文档接口使用攻略,一文搞定

探索淘宝数据的奥秘,淘宝是目前国内最大的B2C电商平台之一,每天都会产生海量的数据。借助淘宝API技术文档,我们可以轻松地获取到这些数据,从而为电商运营和数据分析提供有力支持。 1.什么是淘宝API? 淘宝API&#xf…