目录
1. 说明:
2. 方案设计:
2.1 资源配置:
2.2 交互Topics:
3. 实现步骤
3.1 Nifi 桌面
3.2 MqttToKafka
3.2.1 配置
3.2.2 测试
3.2.3 结果
3.3 KafkaToMqtt
3.3.1 配置
3.3.1 测试
3.3.1 结果 编辑
4. 总结:
4.1 知识点
Nifi Kafka Processor 配置字典:
Topic通配符:
5. 参考:
1. 说明:
在一些方案实现过程中,感觉需要一种接驳器来连接不同的数据源并汇流到一处进行统一处理,于是寻到NIFI(官网)这个工具,它相当于“数据水管+接驳器工具箱”,能丝滑联结不同的数据源,总体思路是把各类数据源汇流到Kafka集中处理,比如日志文件,消息传递,数据库操作等。初步使用感觉很不错,分享之。
2. 方案设计:
- 连接Emqx集群(mqtt服务)与Kafka集群,实现数据流动的双工运作 - 客户端(连mqtt) <=> 应用服务(连kafka)
2.1 资源配置:
简单起见,在docker环境中实施,后续迁移到K8s
服务集群 | 服务入口 | 备注 |
MQTT | (tcp|mqtt)://host001.dev.ia:1883 | client id: nifi-xio1-sub1 订阅者 nifi-xio1-pub1 发布者 |
Kafka | host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092 | |
Apache Nifi | http://host001.dev.ia:9080/nifi/ |
Nifi的docker配置
# 建个卷,持久化数据
docker volume create nifi_data
docker-compose.yml
version: "3.7"
services:
nifi:
image: apache/nifi:1.9.2
container_name: nifi
restart: always
ports:
- "9080:8080"
environment:
- NIFI_WEB_HTTP_HOST=0.0.0.0
#- NIFI_HOME=/home/nifi
#- NIFI_LOG_DIR=/home/nifi/logs
volumes:
- nifi_data:/home/nifi
volumes:
nifi_data:
external: true
2.2 交互Topics:
Topic | 备注 |
test.topic.nifi1 | 测试接收 |
test.topic.bus | 总线 |
test.device.* | 测试通配符topic |
test.device.mw3039kkj001 | 测试带设备id的topic |
3. 实现步骤
3.1 Nifi 桌面
配好后,访问http://host001.dev.ia:9080/nifi/, 中间是配好的两个Processor Group,分别是MqttToKafka与KafkaToMqtt,代表双向流动配置。
3.2 MqttToKafka
3.2.1 配置
加ConsumeMQTT Processor:拉Processor组件下去,点开选ConsumeMQTT
Settings | 备注 | |
Name | ConsumeMQTT | |
Automatically terminate relationships | failure / success 勾选 |
Properties | 备注 | |
Name | ConsumeMQTT | |
Broker URI | tcp://host001.dev.ia:1883 | |
Client ID | nifi-xio1-sub1 | |
Username/Password | -- | |
Topic Filter | test.topic.nifi1 | |
Max Queue Size | 1000 |
加PublishKafka_2_0 Processor:拉Processor组件下去,点开选PublishKafka_2_0
Properties | 备注 | |
Name | PublishKafka_2_0 | |
Kafka Brokers | host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092 | |
Security Protocol | PLAINTEXT | |
Topic Name | test.topic.nifi1 | |
Delivery Guarantee | Guarantee Replicated Delivery | |
Use Transactions | true |
拖动ConsumeMQTT连接PublishKafka, 会添加一个队列连接组件命名为 Message
正确运行如图:
3.2.2 测试
说明:
- 用mqtt客户端工具MqttX向topic=tset.topic.nifi1发送json数据包
- 用python脚本作为消费者客户端连接kafka,订阅topic=tset.topic.nifi1,获取该数据包
python脚本:
from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json
async def consume_loop(consumer, topics):
try:
# 订阅主题
consumer.subscribe(topics)
while True:
# 轮询消息
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print(
"%% %s [%d] reached end at offset %d\n"
% (msg.topic(), msg.partition(), msg.offset())
)
elif msg.error():
raise KafkaException(msg.error())
else:
# 正常消息
raw_message = msg.value()
print(f"Raw message: {raw_message}")
parsed_message = json.loads(raw_message.decode("utf-8"))
print(f"Received message: {type(parsed_message)} : {parsed_message}")
await asyncio.sleep(0.01) # 小睡片刻,让出控制权
finally:
# 关闭消费者
consumer.close()
async def consume():
# 消费者配置
conf = {
"bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092",
"group.id": "mygroup1",
"auto.offset.reset": "earliest",
}
# 创建消费者
consumer = Consumer(conf)
await consume_loop(consumer, ["tset.topic.nifi1"])
if __name__ == "__main__":
asyncio.run(consume())
3.2.3 结果
脚本 Nifi
3.3 KafkaToMqtt
3.3.1 配置
加ConsumeKafkaProcessor:拉Processor组件下去,点开选ConsumeMQTT
Settings | 备注 | |
Name | ConsumeKafka_2_0 |
Properties | 备注 | |
Kafka Brokers | host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092 | |
Topic Name(s) | test.topic.bus / test.device.* | |
Group ID | test |
加PublishMQTT Processor:拉Processor组件下去,点开选PublishMQTT
Settings | 备注 | |
Name | PublishMQTT | |
Automatically terminate relationships | failure / success 勾选 |
Properties | 备注 | |
Broker URI | tcp://host001.dev.ia:1883 | |
Client ID | nifi-xio1-pub1 | |
Username/Password | -- | |
Topic Filter | test.topic.bus | |
QoS | 0 |
拖动ConsumeKafka_2_0连接PublishMQTT, 会添加一个队列连接组件命名为 Message
正确运行如图:
3.3.1 测试
说明:
- python脚本向Kafka发布消息到 topic = test.topic.bus
- MqttX客户端订阅接收
脚本
from confluent_kafka import Producer
import json
def delivery_report(err, msg):
"""Called once for each message produced to indicate delivery result.
Triggered by poll() or flush()."""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
def create_async_producer(config):
"""Creates an instance of an asynchronous Kafka producer."""
return Producer(config)
def produce_messages(producer, topic, messages):
"""Asynchronously produces messages to a Kafka topic."""
for message in messages:
# Trigger any available delivery report callbacks from previous produce() calls
producer.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
producer.produce(
topic, json.dumps(message).encode("utf-8"), callback=delivery_report
)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
producer.flush()
if __name__ == "__main__":
# Kafka configuration
# Replace these with your server's configuration
conf = {
"bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092", # Replace with your Kafka server addresses
# "client.id": "python-producer",
}
# Create an asynchronous Kafka producer
async_producer = create_async_producer(conf)
# Messages to send to Kafka
messages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]
# Produce messages
# produce_messages(async_producer, "test.topic.bus", messages_to_send)
produce_messages(async_producer, "test.device.mw3039kkj001", messages_to_send)
3.3.1 结果
MqttX
Nifi
4. 总结:
Nifi支持集群化部署,如此从数据采集,数据流动到数据存储都实现了分布式,而且有可视化的界面可方便地进行数据节点的集聚与增减配置,目前只是浅尝即止,更深入的研究待后续不断补充优化。
4.1 知识点
Nifi Kafka Processor 配置字典:
Delivery Guarantee | 数据传递保证
| |
Use Transactions | 使用事务 true / false | |
Topic通配符:
“/” | 主题层级分隔符 | 如果存在分隔符,它将主题名分割为多个主题层级。
|
“#” | 多层通配符 | 匹配主题中任意层级的通配符 如果客户端订阅主题 “china/guangzhou/#”, 它会收到使用下列主题名发布的消息
|
“+” | 单层通配符 | 单个主题层级匹配的通配符。在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。
|
“$” | 匹配一个字符 | $xx /$xx /xx$ |
5. 参考:
- https://zhuanlan.zhihu.com/p/628628189
- https://zhuanlan.zhihu.com/p/697301397
- https://blog.51cto.com/u_16213319/7344183
- Apache NiFi Docker Compose | All About
- https://blog.51cto.com/u_16099203/10959511
- 大数据NiFi(二十一):监控日志文件生产到Kafka-腾讯云开发者社区-腾讯云
- PublishMQTT