ThingsBoard规则链节点:Kafka 节点详解

news2025/1/15 7:10:06
引言

ThingsBoard 是一个开源的物联网平台,提供了设备管理、数据收集、处理和可视化等功能。规则链是 ThingsBoard 中的一个强大功能,允许用户定义复杂的业务逻辑来处理设备上报的数据。在规则链中,Kafka 节点用于将消息发送到 Apache Kafka 集群。

ThingsBoard从入门到实战课程,深入透析底层原理,快速搭建自己的IOT平台_哔哩哔哩_bilibiliThingsBoard从入门到实战课程,深入透析底层原理,快速搭建自己的IOT平台共计36条视频,包括:1、ThingsBoard项目介绍、2、ThingsBoard前端Vue版本代码编译、3、ThingsBoard本地后端源码编译等,UP主更多精彩视频,请关注UP账号。icon-default.png?t=O83Ahttps://www.bilibili.com/video/BV1CH36egEDM/?spm_id_from=333.999.0.0

1. Kafka 节点简介

Kafka 节点的主要作用是在规则链执行过程中,将消息发布到 Apache Kafka 的主题(Topic)。Apache Kafka 是一个分布式流处理平台,它被设计为高吞吐量、低延迟的消息传递系统。通过使用 Kafka 节点,可以将设备上报的数据或处理结果发送到 Kafka 主题,再由其他服务或系统消费这些消息,实现数据的进一步处理和分析。

2. 节点配置
  • Bootstrap Servers:指定 Kafka 集群的地址列表。
  • Topic:指定要发布的 Kafka 主题名称。
  • Key:可选参数,指定消息的键值。
  • Value:指定要发送的消息内容,可以是静态文本或动态变量。
  • Headers:可选参数,指定消息头信息。
  • Security Protocol:指定安全协议,例如 PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL 等。
  • SASL Mechanism:如果使用 SASL 认证,需要指定认证机制,如 PLAINSCRAM-SHA-256 等。
  • Credentials:如果使用 SASL 认证,需要提供用户名和密码。
2.1 基本配置示例
{
  "bootstrapServers": "localhost:9092",
  "topic": "device-data",
  "key": "${msg.deviceId}",
  "value": "${msg.data}",
  "headers": [
    {
      "key": "source",
      "value": "thingsboard"
    }
  ],
  "securityProtocol": "PLAINTEXT"
}
3. 使用场景

Kafka 节点在多种场景下都非常有用,特别是在需要将设备数据发送到 Kafka 集群进行进一步处理和分析的场景中。以下是一些具体的应用场景:

3.1 数据传输

在需要将设备上报的数据发送到 Kafka 集群时,可以通过 Kafka 节点将数据发送到指定的主题。

{
  "bootstrapServers": "localhost:9092",
  "topic": "device-data",
  "key": "${msg.deviceId}",
  "value": "${msg.data}"
}
3.2 数据分析

在需要对设备数据进行实时分析时,可以通过 Kafka 节点将数据发送到 Kafka 集群,再由 Apache Flink, Apache Spark 或其他流处理框架进行处理和分析。

{
  "bootstrapServers": "localhost:9092",
  "topic": "data-analysis",
  "key": "${msg.deviceId}",
  "value": "${msg.data}"
}
3.3 事件通知

在需要发送事件通知时,可以通过 Kafka 节点将事件消息发送到 Kafka 主题,再由其他服务或系统消费这些消息进行通知。

{
  "bootstrapServers": "localhost:9092",
  "topic": "event-notification",
  "key": "${msg.deviceId}",
  "value": "${msg.eventType}"
}
3.4 日志记录

在需要记录设备日志时,可以通过 Kafka 节点将日志消息发送到 Kafka 主题,再由日志管理系统进行处理和存储。

{
  "bootstrapServers": "localhost:9092",
  "topic": "device-logs",
  "key": "${msg.deviceId}",
  "value": "${msg.logMessage}"
}
4. 实际项目中的应用

下面是一个实际项目中的例子,展示如何在智能家居系统中使用 Kafka 节点。

4.1 项目背景

假设我们正在开发一个智能家居系统,该系统需要支持用户通过手机应用控制家中的灯光、空调等设备,并记录设备的状态和使用情况。此外,还需要将设备数据发送到 Kafka 集群进行进一步处理和分析。

4.2 项目需求
  • 记录设备的状态,例如当前温度、湿度等。
  • 记录设备的使用情况,例如开关次数、能耗等。
  • 实现实时反馈,确保用户能够及时了解操作结果。
  • 将设备数据发送到 Kafka 集群进行进一步处理和分析。
4.3 实现步骤
  1. 部署设备

    • 在家中安装智能灯光、空调等设备,并连接到 ThingsBoard 平台。
  2. 创建规则链

    • 添加 Kafka 节点,用于将设备上报的数据发送到 Kafka 集群。
    • 添加其他处理节点,如设备控制、状态查询和数据存储。
  3. 配置规则链

    • 配置 Kafka 节点,用于发送设备数据。
{
  "bootstrapServers": "localhost:9092",
  "topic": "device-data",
  "key": "${msg.deviceId}",
  "value": "${msg.data}"
}
  1. 处理数据
    • 根据业务逻辑,动态地将设备数据发送到 Kafka 集群。
// 发送设备数据到 Kafka
public void sendDeviceDataToKafka(String bootstrapServers, String topic, String deviceId, String data) {
    // 配置 Kafka 节点
    JsonNode config = JsonNodeFactory.instance.objectNode()
        .put("bootstrapServers", bootstrapServers)
        .put("topic", topic)
        .put("key", deviceId)
        .put("value", data);

    kafkaNode.sendMessage(config);
}
  1. 前端界面

    • 开发一个前端界面,显示设备的状态和使用情况。
    • 提供一个界面,让用户能够查看和管理设备的状态和使用情况,以及接收通知。
  2. 数据查询

    • 使用 SQL 查询,获取设备的状态和使用情况。
SELECT * FROM device_status WHERE device_id = 'device1' ORDER BY timestamp DESC LIMIT 10;
SELECT * FROM device_usage WHERE device_id = 'device1' ORDER BY timestamp DESC LIMIT 10;
5. 总结

Kafka 节点在 ThingsBoard 规则链中是一个非常有用的工具,可以帮助你将消息发布到 Kafka 主题,实现数据的进一步处理和分析。通过合理地使用 Kafka 节点,可以在数据传输、数据分析、事件通知和日志记录等场景中,确保系统的高效性和灵活性。

 🌐 项目地址

Things Vueicon-default.png?t=O83Ahttp://thingsvue.tpson.cn:7772/#/login?redirect=/tb-home/index

账号:admin@thingsboard.org 
密码:admin123456

🎽 安装使用

获取项目代码:

代码地址icon-default.png?t=O83Ahttps://gitee.com/tpsonwell_admin/thingsvue

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2252680.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Java Springboot个人财务APP且微信小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA/eclipse 微信…

阿里云 Elastic Enterprise 正式上线!

在数据驱动的商业环境中,企业面临着日益复杂的数据管理与分析挑战。阿里云Elasticsearch服务不仅免费提供了 Elastic 原厂的 Enterprise 版本功能,更凭借其增强的数据管理能力、智能AI分析、先进的搜索技术以及全面的安全特性,致力于为企业提…

1201作业

思维导图 作业 头函数 #include <myhead.h> #include"linklist.h" int main(int argc, const char *argv[]) {//调用创建链表函数node_ptr L list_create();if(NULL L){return -1;}//调用头插函数list_insert_head(L,Q);list_insert_head(L,W);list_insert…

【Code First】.NET开源 ORM 框架 SqlSugar 系列

.NET开源 ORM 框架 SqlSugar 系列 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列【Code First】.NET开源 ORM 框架 SqlSugar 系列【数据事务…

大语言模型微调与 XTuner 微调实战

1 大语言模型微调 1.1 什么是微调 大语言模型微调&#xff08;Fine-tuning of Large Language Models&#xff09;是指在预训练的大型语言模型基础上&#xff0c;使用特定任务的数据进一步训练模型&#xff0c;以使其更好地适应和执行特定任务的过程&#xff0c;用于使LLM&am…

Vulnhub靶场 Matrix-Breakout: 2 Morpheus 练习

目录 0x00 准备0x01 主机信息收集0x02 站点信息收集0x03 漏洞查找与利用1. 文件上传2. 提权 0x04 总结 0x00 准备 下载连接&#xff1a;https://download.vulnhub.com/matrix-breakout/matrix-breakout-2-morpheus.ova 介绍&#xff1a; This is the second in the Matrix-Br…

基于hexo框架的博客搭建流程

这篇博文讲一讲hexo博客的搭建及文章管理&#xff0c;也算是我对于暑假的一个交代 &#xff01;&#xff01;&#xff01;注意&#xff1a;下面的操作是基于你已经安装了node.js和git的前提下进行的&#xff0c;并且拥有github账号 创建一个blog目录 在磁盘任意位置创建一个…

24.12.02 Element

import { createApp } from vue // 引入elementPlus js库 css库 import ElementPlus from element-plus import element-plus/dist/index.css //中文语言包 import zhCn from element-plus/es/locale/lang/zh-cn //图标库 import * as ElementPlusIconsVue from element-plus/i…

vxe-table 设置树表格斑马线条纹样式

vxe-table 设置斑马线条纹样式&#xff0c;通过设置 stripe 参数 官网&#xff1a;https://vxetable.cn 表格 斑马线条纹&#xff0c;通过设置 stripe 参数 <template><div><vxe-grid v-bind"gridOptions"></vxe-grid></div> </…

力扣3366.最小数组和

力扣3366.最小数组和 题目 题目解析及思路 题目要求对于数组进行两种操作&#xff0c;使最终数组和最小 注意&#xff1a;每个元素可以同时执行两种操作 考虑动归&#xff0c;暴力的遍历每种情况 代码 记忆化搜索 class Solution { public:// minArraySum 函数用于计算在…

缓存穿透,缓存雪崩,缓存击穿

缓存穿透&#xff1a; 客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样的缓存永远不会生效&#xff0c;这些请求会直接打到数据库中&#xff0c;造成数据库压力过大 解决方法&#xff1a;1.缓存空对象 //TODO 此方法中解决了缓存穿透问题&#xff08;使用了缓存…

【C++boost::asio网络编程】有关异步读写api的笔记

异步读写api 异步写操作async_write_someasync_send 异步读操作async_read_someasync_receive 定义一个Session类&#xff0c;主要是为了服务端专门为客户端服务创建的管理类 class Session { public:Session(std::shared_ptr<asio::ip::tcp::socket> socket);void Conn…

atcoder abc 382 lazy_tag线段树

A Daily Cookie 代码&#xff1a; #include <bits/stdc.h> using namespace std;typedef long long ll;int main() {int n, d;cin >> n >> d;string s;cin >> s;int cnt d;for(auto t: s) if(t .) cnt ;cout << min(n, cnt); } B Daily Co…

【NLP 8、normalization、sigmoid,softmax归一化函数】

"燃尽最后的本能&#xff0c;意志力会带你杀出重围" —— 24.12.2 1. Normalization&#xff08;归一化&#xff09; 归一化是将数据转换为具有统一尺度的形式&#xff0c;通常用于数据预处理阶段。常见的归一化方法包括 Min-Max归一化、Z-Score 归一化和 L…

深入学习指针(5)!!!!!!!!!!!!!!!

文章目录 1.回调函数是什么&#xff1f;2.qsort使用举例2.1使用qsort函数排序整形数据2.2使用sqort排序结构数据 3.qsort函数的模拟实现 1.回调函数是什么&#xff1f; 回调函数就是⼀个通过函数指针调⽤的函数。 如果你把函数的指针&#xff08;地址&#xff09;作为参数传递…

Matlab Simulink 电力电子仿真-单相电压型半桥逆变电路分析

目录 一、单相电压型半桥逆变电路仿真模型 1.电路模型 2.电路模型参数 二、仿真分析 三、总结 1.优缺点 2.应用场景 一、单相电压型半桥逆变电路仿真模型 1.电路模型 单相电压型半桥逆变电路是一种常见的逆变电路&#xff0c;主要用于将直流电源转换为交流电源。 &…

《Vue零基础入门教程》第十五课:样式绑定

往期内容 《Vue零基础入门教程》第六课&#xff1a;基本选项 《Vue零基础入门教程》第八课&#xff1a;模板语法 《Vue零基础入门教程》第九课&#xff1a;插值语法细节 《Vue零基础入门教程》第十课&#xff1a;属性绑定指令 《Vue零基础入门教程》第十一课&#xff1a;事…

做异端中的异端 -- Emacs裸奔之路5: 条件反射式移动

移动命令使用频率非常之高&#xff0c;只要方法多一个小小的弯路&#xff0c;对使用体验影响都很大。 克服移动上的难度&#xff0c;离掌握Emacs就不远了。 在不安装其它包的情况下&#xff0c;Emacs就可以&#xff1a; 以行为单位移动&#xff1a; C-n/C-p以段落为单位移动&…

基于单片机的WIFI、语音、储存、时钟、闹钟、定位系统

所有仿真详情导航&#xff1a; PROTEUS专栏说明-CSDN博客 目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机&#xff0c;采用DS1302时钟模块&#xff0c;通过LCD1602显示实时时间&#xff0c;也可以储存时间在AT2DC02中&#xff0c…

【阅读记录-章节4】Build a Large Language Model (From Scratch)

文章目录 4. Implementing a GPT model from scratch to generate text4.1 Coding an LLM architecture4.1.1 配置小型 GPT-2 模型4.1.2 DummyGPTModel代码示例4.1.3 准备输入数据并初始化 GPT 模型4.1.4 初始化并运行 GPT 模型 4.2 Normalizing activations with layer normal…