引言
ThingsBoard 是一个开源的物联网平台,提供了设备管理、数据收集、处理和可视化等功能。规则链是 ThingsBoard 中的一个强大功能,允许用户定义复杂的业务逻辑来处理设备上报的数据。在规则链中,
Kafka
节点用于将消息发送到 Apache Kafka 集群。
ThingsBoard从入门到实战课程,深入透析底层原理,快速搭建自己的IOT平台_哔哩哔哩_bilibiliThingsBoard从入门到实战课程,深入透析底层原理,快速搭建自己的IOT平台共计36条视频,包括:1、ThingsBoard项目介绍、2、ThingsBoard前端Vue版本代码编译、3、ThingsBoard本地后端源码编译等,UP主更多精彩视频,请关注UP账号。https://www.bilibili.com/video/BV1CH36egEDM/?spm_id_from=333.999.0.0
1. Kafka 节点简介
Kafka
节点的主要作用是在规则链执行过程中,将消息发布到 Apache Kafka 的主题(Topic)。Apache Kafka 是一个分布式流处理平台,它被设计为高吞吐量、低延迟的消息传递系统。通过使用 Kafka
节点,可以将设备上报的数据或处理结果发送到 Kafka 主题,再由其他服务或系统消费这些消息,实现数据的进一步处理和分析。
2. 节点配置
- Bootstrap Servers:指定 Kafka 集群的地址列表。
- Topic:指定要发布的 Kafka 主题名称。
- Key:可选参数,指定消息的键值。
- Value:指定要发送的消息内容,可以是静态文本或动态变量。
- Headers:可选参数,指定消息头信息。
- Security Protocol:指定安全协议,例如
PLAINTEXT
,SSL
,SASL_PLAINTEXT
,SASL_SSL
等。 - SASL Mechanism:如果使用 SASL 认证,需要指定认证机制,如
PLAIN
,SCRAM-SHA-256
等。 - Credentials:如果使用 SASL 认证,需要提供用户名和密码。
2.1 基本配置示例
{
"bootstrapServers": "localhost:9092",
"topic": "device-data",
"key": "${msg.deviceId}",
"value": "${msg.data}",
"headers": [
{
"key": "source",
"value": "thingsboard"
}
],
"securityProtocol": "PLAINTEXT"
}
3. 使用场景
Kafka
节点在多种场景下都非常有用,特别是在需要将设备数据发送到 Kafka 集群进行进一步处理和分析的场景中。以下是一些具体的应用场景:
3.1 数据传输
在需要将设备上报的数据发送到 Kafka 集群时,可以通过 Kafka
节点将数据发送到指定的主题。
{
"bootstrapServers": "localhost:9092",
"topic": "device-data",
"key": "${msg.deviceId}",
"value": "${msg.data}"
}
3.2 数据分析
在需要对设备数据进行实时分析时,可以通过 Kafka
节点将数据发送到 Kafka 集群,再由 Apache Flink, Apache Spark 或其他流处理框架进行处理和分析。
{
"bootstrapServers": "localhost:9092",
"topic": "data-analysis",
"key": "${msg.deviceId}",
"value": "${msg.data}"
}
3.3 事件通知
在需要发送事件通知时,可以通过 Kafka
节点将事件消息发送到 Kafka 主题,再由其他服务或系统消费这些消息进行通知。
{
"bootstrapServers": "localhost:9092",
"topic": "event-notification",
"key": "${msg.deviceId}",
"value": "${msg.eventType}"
}
3.4 日志记录
在需要记录设备日志时,可以通过 Kafka
节点将日志消息发送到 Kafka 主题,再由日志管理系统进行处理和存储。
{
"bootstrapServers": "localhost:9092",
"topic": "device-logs",
"key": "${msg.deviceId}",
"value": "${msg.logMessage}"
}
4. 实际项目中的应用
下面是一个实际项目中的例子,展示如何在智能家居系统中使用 Kafka
节点。
4.1 项目背景
假设我们正在开发一个智能家居系统,该系统需要支持用户通过手机应用控制家中的灯光、空调等设备,并记录设备的状态和使用情况。此外,还需要将设备数据发送到 Kafka 集群进行进一步处理和分析。
4.2 项目需求
- 记录设备的状态,例如当前温度、湿度等。
- 记录设备的使用情况,例如开关次数、能耗等。
- 实现实时反馈,确保用户能够及时了解操作结果。
- 将设备数据发送到 Kafka 集群进行进一步处理和分析。
4.3 实现步骤
-
部署设备:
- 在家中安装智能灯光、空调等设备,并连接到 ThingsBoard 平台。
-
创建规则链:
- 添加
Kafka
节点,用于将设备上报的数据发送到 Kafka 集群。 - 添加其他处理节点,如设备控制、状态查询和数据存储。
- 添加
-
配置规则链
- 配置
Kafka
节点,用于发送设备数据。
- 配置
{
"bootstrapServers": "localhost:9092",
"topic": "device-data",
"key": "${msg.deviceId}",
"value": "${msg.data}"
}
- 处理数据:
- 根据业务逻辑,动态地将设备数据发送到 Kafka 集群。
// 发送设备数据到 Kafka
public void sendDeviceDataToKafka(String bootstrapServers, String topic, String deviceId, String data) {
// 配置 Kafka 节点
JsonNode config = JsonNodeFactory.instance.objectNode()
.put("bootstrapServers", bootstrapServers)
.put("topic", topic)
.put("key", deviceId)
.put("value", data);
kafkaNode.sendMessage(config);
}
-
前端界面:
- 开发一个前端界面,显示设备的状态和使用情况。
- 提供一个界面,让用户能够查看和管理设备的状态和使用情况,以及接收通知。
-
数据查询:
- 使用 SQL 查询,获取设备的状态和使用情况。
SELECT * FROM device_status WHERE device_id = 'device1' ORDER BY timestamp DESC LIMIT 10;
SELECT * FROM device_usage WHERE device_id = 'device1' ORDER BY timestamp DESC LIMIT 10;
5. 总结
Kafka
节点在 ThingsBoard 规则链中是一个非常有用的工具,可以帮助你将消息发布到 Kafka 主题,实现数据的进一步处理和分析。通过合理地使用Kafka
节点,可以在数据传输、数据分析、事件通知和日志记录等场景中,确保系统的高效性和灵活性。
🌐 项目地址
Things Vuehttp://thingsvue.tpson.cn:7772/#/login?redirect=/tb-home/index
账号:admin@thingsboard.org 密码:admin123456
🎽 安装使用
获取项目代码:
代码地址https://gitee.com/tpsonwell_admin/thingsvue