Apache Nifi挂接MQTT与Kafka实践

news2025/1/12 12:00:42

目录

1.  说明:

2. 方案设计:

2.1 资源配置:

2.2 交互Topics:  

3. 实现步骤 

3.1 Nifi 桌面

3.2 MqttToKafka

3.2.1 配置

3.2.2 测试

3.2.3 结果

3.3 KafkaToMqtt

3.3.1 配置 

3.3.1 测试

3.3.1 结果 ​编辑

4. 总结:

4.1 知识点

Nifi Kafka Processor 配置字典:

Topic通配符:

5. 参考:


1.  说明:

      在一些方案实现过程中,感觉需要一种接驳器来连接不同的数据源并汇流到一处进行统一处理,于是寻到NIFI(官网)这个工具,它相当于“数据水管+接驳器工具箱”,能丝滑联结不同的数据源,总体思路是把各类数据源汇流到Kafka集中处理,比如日志文件,消息传递,数据库操作等。初步使用感觉很不错,分享之。

2. 方案设计:

- 连接Emqx集群(mqtt服务)与Kafka集群,实现数据流动的双工运作 - 客户端(连mqtt) <=> 应用服务(连kafka)

2.1 资源配置:

简单起见,在docker环境中实施,后续迁移到K8s

服务集群服务入口备注
MQTT (tcp|mqtt)://host001.dev.ia:1883

client id:

nifi-xio1-sub1 订阅者

nifi-xio1-pub1 发布者

Kafkahost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Apache Nifihttp://host001.dev.ia:9080/nifi/

Nifi的docker配置

# 建个卷,持久化数据
docker volume create nifi_data

docker-compose.yml

version: "3.7"
services:
  nifi:
    image: apache/nifi:1.9.2
    container_name: nifi
    restart: always
    ports:
      - "9080:8080"
    environment:
      - NIFI_WEB_HTTP_HOST=0.0.0.0
      #- NIFI_HOME=/home/nifi
      #- NIFI_LOG_DIR=/home/nifi/logs

    volumes:
      - nifi_data:/home/nifi

volumes:
  nifi_data:
    external: true
2.2 交互Topics:  
Topic备注
test.topic.nifi1测试接收
test.topic.bus总线
test.device.*测试通配符topic

test.device.mw3039kkj001

测试带设备id的topic

3. 实现步骤 

3.1 Nifi 桌面

配好后,访问​http://host001.dev.ia:9080/nifi/​, 中间是配好的两个Processor Group,分别是MqttToKafka与KafkaToMqtt,代表双向流动配置。

3.2 MqttToKafka
3.2.1 配置

 加ConsumeMQTT Processor:拉Processor组件下去,点开选ConsumeMQTT

Settings备注
NameConsumeMQTT
Automatically terminate relationships

failure / success 勾选

Properties备注
NameConsumeMQTT
Broker URItcp://host001.dev.ia:1883
Client IDnifi-xio1-sub1
Username/Password--
Topic Filtertest.topic.nifi1
Max Queue Size1000

加PublishKafka_2_0 Processor:拉Processor组件下去,点开选PublishKafka_2_0

Properties备注
NamePublishKafka_2_0
Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Security ProtocolPLAINTEXT
Topic Nametest.topic.nifi1
Delivery Guarantee

Guarantee Replicated Delivery

Use Transactionstrue

拖动ConsumeMQTT连接PublishKafka, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.2.2 测试

说明:

  1. 用mqtt客户端工具MqttX向topic=tset.topic.nifi1发送json数据包
  2. 用python脚本作为消费者客户端连接kafka,订阅topic=tset.topic.nifi1,获取该数据包

python脚本:

from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json


async def consume_loop(consumer, topics):
    try:
        # 订阅主题
        consumer.subscribe(topics)

        while True:
            # 轮询消息
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print(
                        "%% %s [%d] reached end at offset %d\n"
                        % (msg.topic(), msg.partition(), msg.offset())
                    )
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                # 正常消息
                raw_message = msg.value()
                print(f"Raw message: {raw_message}")
                parsed_message = json.loads(raw_message.decode("utf-8"))
                print(f"Received message: {type(parsed_message)} : {parsed_message}")
            await asyncio.sleep(0.01)  # 小睡片刻,让出控制权
    finally:
        # 关闭消费者
        consumer.close()


async def consume():
    # 消费者配置
    conf = {
        "bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092",
        "group.id": "mygroup1",
        "auto.offset.reset": "earliest",
    }

    # 创建消费者
    consumer = Consumer(conf)
    await consume_loop(consumer, ["tset.topic.nifi1"])


if __name__ == "__main__":
    asyncio.run(consume())
3.2.3 结果

脚本 Nifi 

3.3 KafkaToMqtt
3.3.1 配置 

加ConsumeKafkaProcessor:拉Processor组件下去,点开选ConsumeMQTT

Settings备注
NameConsumeKafka_2_0
Properties备注
Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Topic Name(s)test.topic.bus / test.device.*
Group IDtest

加PublishMQTT Processor:拉Processor组件下去,点开选PublishMQTT

Settings备注
NamePublishMQTT
Automatically terminate relationships

failure / success 勾选

Properties备注
Broker URItcp://host001.dev.ia:1883
Client IDnifi-xio1-pub1
Username/Password--
Topic Filtertest.topic.bus
QoS0

 拖动ConsumeKafka_2_0连接PublishMQTT, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.3.1 测试

说明:

  1. python脚本向Kafka发布消息到 topic = test.topic.bus
  2. MqttX客户端订阅接收

脚本

from confluent_kafka import Producer
import json


def delivery_report(err, msg):
    """Called once for each message produced to indicate delivery result.
    Triggered by poll() or flush()."""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def create_async_producer(config):
    """Creates an instance of an asynchronous Kafka producer."""
    return Producer(config)


def produce_messages(producer, topic, messages):
    """Asynchronously produces messages to a Kafka topic."""
    for message in messages:
        # Trigger any available delivery report callbacks from previous produce() calls
        producer.poll(0)

        # Asynchronously produce a message, the delivery report callback
        # will be triggered from poll() above, or flush() below, when the message has
        # been successfully delivered or failed permanently.
        producer.produce(
            topic, json.dumps(message).encode("utf-8"), callback=delivery_report
        )

    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered.
    producer.flush()


if __name__ == "__main__":
    # Kafka configuration
    # Replace these with your server's configuration
    conf = {
        "bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092",  # Replace with your Kafka server addresses
        # "client.id": "python-producer",
    }

    # Create an asynchronous Kafka producer
    async_producer = create_async_producer(conf)

    # Messages to send to Kafka
    messages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]

    # Produce messages
    # produce_messages(async_producer, "test.topic.bus", messages_to_send)
    produce_messages(async_producer, "test.device.mw3039kkj001", messages_to_send)
3.3.1 结果 

MqttX 

Nifi 

4. 总结:

       Nifi支持集群化部署,如此从数据采集,数据流动到数据存储都实现了分布式,而且有可视化的界面可方便地进行数据节点的集聚与增减配置,目前只是浅尝即止,更深入的研究待后续不断补充优化。

4.1 知识点
Nifi Kafka Processor 配置字典:
Delivery Guarantee

数据传递保证

  1. Best Effort (尽力交付,相当于ack=0)
  2. Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):
  3. Guarantee Replicated Delivery(保证复制交付,相当于ack=-1)
Use Transactions

使用事务 

true / false 

Topic通配符:
“/”

主题层级分隔符

如果存在分隔符,它将主题名分割为多个主题层级。

如:room401/tv/contrl/sensor

“#”多层通配符

匹配主题中任意层级的通配符

如果客户端订阅主题 “china/guangzhou/#”, 它会收到使用下列主题名发布的消息

china/guangzhou china/guangzhou/huangpu china/guangzhou/tianhe/zhongshanlu china/guangzhou/tianhe/zhongshanlu/num123

school/#                //也匹配单独的 “school” ,因为 # 包括它的父级。
#                       //是有效的,会收到所有的应用消息。
school/teacher/#        //有效的。
school/teacher#         //无效的。
school/teacher/#/lever  //无效的,必须是主题过滤器的最后一个字符
https://blog.51cto.com/u_16099203/10959511

“+”单层通配符

单个主题层级匹配的通配符。在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。

china/+ 只能匹配 china/guangzhou

china/+/+/zhongshanlu 能匹配china/guangzhou/tianhe/zhongshanlu和china/shenzhen/nanshan/zhongshanlu

“$”匹配一个字符$xx
/$xx
/xx$

5. 参考:

- https://zhuanlan.zhihu.com/p/628628189

- https://zhuanlan.zhihu.com/p/697301397

- https://blog.51cto.com/u_16213319/7344183

- Apache NiFi Docker Compose | All About

- https://blog.51cto.com/u_16099203/10959511

- 大数据NiFi(二十一):监控日志文件生产到Kafka-腾讯云开发者社区-腾讯云

- PublishMQTT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1952375.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

web学习笔记(八十三)git

目录 1.Git的基本概念 2.gitee常用的命令 3.解决两个人操作不同文件造成的冲突 4.解决两个人操作同一个文件造成的冲突 1.Git的基本概念 git是一种管理代码的方式&#xff0c;广泛用于软件开发和版本管理。我们通常使用gitee&#xff08;码云&#xff09;来云管理代码。 …

使用SpringTask框架

目录 一.什么是SpringTask&#xff1f; 二.cron表达式&#xff1a; 三.SpringTask框架的使用操作&#xff1a; 1.导入maven坐标spring-context&#xff1a; 2.启动类添加 EnableScheduling 以此来开启任务调度&#xff1a; 3.自定义定时任务类&#xff1a; 普通案例&#…

如何写好技术文档 - 来自Google十多年的文档经验

[导读]本文大部分内容翻译总结自《Software Engineering at Google》第10章节 Documentation。另外&#xff0c;该书电子版近日已经可以免费阅读了https://qiangmzsx.github.io/Software-Engineering-at-Google/#/?idsoftware-engineering-at-google&#xff0c;有兴趣的同学可…

Dockerfile指令详解和Docker操作命令

1.容器的特点&#xff1a;1&#xff09;自包含&#xff08;包括应用程序及其运行环境&#xff09;&#xff1b;2&#xff09;可移植&#xff1b;3&#xff09;相互隔离&#xff1b;4&#xff09;轻量级。 2.docker成为容器的事实标准在于&#xff1a;1&#xff09;在运行环境上…

Unity 资源 之 Pop It 3D 解压玩具与双人AI游戏 Unity 资源包分享

精彩呈现&#xff1a;Pop It 3D 解压玩具与双人AI游戏 Unity 资源包分享 一、Pop It 3D 解压玩具的魅力二、双人游戏的互动乐趣三、Unity 游戏资源包的优势四、如何获取资源包 亲爱的游戏爱好者们&#xff0c;今天为大家带来一款令人兴奋的游戏资源——Pop It 3D 解压玩具双人带…

ubuntu串口重命名助手arm64架构(下)Qt交叉编译arm64

✨✨ Rqtz 个人主页 : 点击✨✨ &#x1f388;PyQt系列专栏:点击&#x1f388; &#x1f388;Qt智能车上位机专栏: 点击&#x1f388; &#x1f388;Qt串口助手专栏:点击&#x1f388; &#x1f4ab;宗旨:共享IT之美,共创机器未来 目录 前言 设备介绍 查询系统架构 下载…

【JVM基础05】——组成-能不能解释一下方法区?

目录 1- 引言&#xff1a;方法区概述1-1 方法区是什么&#xff1f;(What)1-2 为什么用方法区&#xff1f;方法区的作用 (Why) 2- ⭐核心&#xff1a;详解方法区(How)2-1 能不能解释一下方法区&#xff1f;2-2 元空间内存溢出问题2-3 什么是常量池&#xff1f;2-4 运行时常量池 …

Stable Diffusion WebUI本地环境搭建

一、项目代码下载 git clone https://github.com/AUTOMATIC1111/stable-diffusion-webui 二、环境配置 conda create --n stafu python3.10.6 实际上跟自己创建的环境没有关系&#xff0c;项目启动会自动复制这个环境&#xff0c;之后项目根据这个基础环境构建 也可以在自己…

【五】MySql8基于m2芯片arm架构Ubuntu24虚拟机安装

文章目录 1. 更新系统包列表2. 安装 MySQL APT Repository3. 更新系统包列表4. 安装 MySQL Server5. 运行安全安装脚本6. 验证 MySQL 安装7. 配置远程连接7.1 首先要确认 MySQL 配置允许远程连接&#xff1a;7.2 重启 MySQL 服务&#xff1a;7.3 检查 MySQL 用户权限&#xff1…

TensorRT推理时间不稳定的解决方案

目录 解决方案 第一步 第二步 效果 解除锁频方法 解决方案 锁定GPU时钟频率&#xff08;实测有效&#xff09; 第一步 使用 nvidia-smi -q -d SUPPORTED_CLOCKS 查询GPU支持的最高频率&#xff0c;如下图所示为 8001 MHz 第二步 cmd&#xff08;管理员权限&#xff09…

0719_驱动2 编写编译linux内核模块

一、编写linux内核模块 linux内核模块三要素&#xff1a; 入口&#xff1a;执行insmod 安装命令操作 出口&#xff1a; 执行rmmod 卸载命令操作 许可证&#xff1a;遵循GPL协议&#xff0c;开源&#xff0c;指定入口地址&#xff0c;出口地址 #include <linux/init.h> #i…

JAVA同城圈子达人交友系统源码支持微信小程序+公众号+H5+APP

&#x1f308; 同城圈子达人交友系统&#xff0c;遇见志同道合的TA&#xff01; &#x1f389; 开篇&#xff1a;告别孤单&#xff0c;同城圈子等你来探索&#xff01; 在这个快节奏的城市生活中&#xff0c;你是否常常感到孤独&#xff0c;渴望找到一群志同道合的朋友&#…

SQL labs-SQL注入(三,sqlmap使用)

本文仅作为学习参考使用&#xff0c;本文作者对任何使用本文进行渗透攻击破坏不负任何责任。 引言&#xff1a; 盲注简述&#xff1a;是在没有回显得情况下采用的注入方式&#xff0c;分为布尔盲注和时间盲注。 布尔盲注&#xff1a;布尔仅有两种形式&#xff0c;ture&#…

Django学习(二)

get请求 练习&#xff1a; views.py def test_method(request):if request.method GET:print(request.GET)# 如果链接中没有参数a会报错print(request.GET[a])# 使用这个方法&#xff0c;当查询不到参数时&#xff0c;不会报错而是返回你设置的值print(request.GET.get(c,n…

深入解析Flowable:工作流与业务流程管理引擎

深入解析Flowable&#xff1a;工作流与业务流程管理引擎 引言 在数字化时代&#xff0c;企业对流程自动化的需求日益增长。有效的工作流和业务流程管理系统可以帮助组织提高生产力、优化资源分配以及增强决策支持。Flowable是一款开源的工作流和业务流程管理&#xff08;BPM&a…

Hadoop单机版环境搭建

一 . 案例信息 Hadoop 的安装部署的模式一共有三种&#xff1a; 本地模式&#xff0c;默认的模式&#xff0c;无需运行任何守护进程&#xff08; daemon &#xff09;&#xff0c;所有程序都在单个 JVM 上执行。由 于在本机模式下测试和调试 MapReduce 程序较为方便&#x…

Ghost Buster Pro for Mac:系统优化的得力助手

Ghost Buster Pro for Mac是一款功能强大的系统优化工具&#xff0c;专为Mac用户设计&#xff0c;旨在提供全方位的系统清理、优化和维护服务。 这款软件拥有出色的垃圾清理能力&#xff0c;能够深度扫描并清除Mac上的无效目录、文件、系统日志、下载历史记录、缓存和临时文件…

WARNING: Ignoring invalid distribution -ip警告信息如何去掉?

查看已安装依赖列表的时候&#xff0c;出现了很多警告信息&#xff0c;如何去掉呢&#xff1f; 解决办法 打开这个路径&#xff1a;d:\software\python\python39\lib\site-packages 这种波浪线开头的&#xff0c;我们将它删除掉,就可以了。

使用 leanback 库 GridView 管理AnroidTV的焦点

一、前情提要 我当前需要开发一个TV应用&#xff0c;但是之前处理过的焦点问题的很少&#xff0c;现在空下来了&#xff0c;对过往的工作做一个总结分享。在手机APP开发中常用的 RecycleView 在 TV 中开发时&#xff0c;无法解决大量的焦点问题&#xff0c;所以使用leanback进…

OSPF LSA 格式及字段详解

在 AS 内的每台设备&#xff0c;根据设备的路由器类型产生一种或多种 LSA。 LSA 的集合形成了 LSDB&#xff08;Link-state Database&#xff09;。 OSPF 中对路由信息的描述都是封装在 LSA 中发布出去的。 常用的 LSA 包括&#xff1a; Router-LSANetwork-LSASummary-LSA&…