kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战

news2024/9/8 23:36:44

1、上一节课我们学习了MQTT producer 生产者步骤,MQTT consumer消费者步骤。该步骤可以从支持MRQTT协议的中间件获取数据,该步骤和kafka consumer 一样可以处理实时数据交互,如下图所示:

 2、双击步骤打开MQTT consumer 配置窗口,如下图所示:

Step name:自定义步骤名称。

Transformation:设置子转换,该子转换的作用是从中间件读取流数据,然后将字段返回给MQTT consumer步骤进行使用。

Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

Topics name:在主题名称字段中,输入您希望订阅流数据(消息)的 MQTT 主题的名称。其实这里的topic是RabbitMQ中的routing key(另外这里的routing key 一定不要绑定队列,否则MQTT consumer步骤无法接收数据)。

Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)

 3、安全验证设置,如下图所示:

Username:MQTT服务器的用户名,如admin

Password:MQTT服务器的密码。

Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

 3、批次设置,使用此选项卡来指定在处理之前要拉取多少消息。您可以指定消息数量和/或特定的时间量。

Duration(ms):
请指定一个以毫秒为单位的时间。此值表示在执行转换之前该步骤将花费多少时间来收集记录。
如果将此选项设置为0,则根据参数Number of records记录数触发消费。要运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Number of records
指定一个数字。在每收集到‘X’条记录之后,指定的转换将被执行,并且这些‘X’条记录将被传递给转换过程。
如果将此选项设置为0,则将参数Duration按持续时间触发消费。为了运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Maximum concurrent batches

指定用于同时收集记录的最大批次数。默认值为1,表示使用单个批次来收集记录。仅当您的消费者步骤无法跟上数据流的速度时,才应使用此选项。
您的计算环境必须具备足够的 CPU 和内存来进行此实现。如果您的环境无法处理指定的最大并发批次数,将会出现错误。


Message prefetch limit
请指定此步骤将排队等待处理的传入消息的限制,即从 kfakfa broker接收到的消息。
设置此值会强制kafka broker处理超过指定限制的消息的背压。默认排队消息的数量是100000条。

 4、字段设置,这里采用默认值就行了,不用调整。

5、子转换结果字段设置,选择子转换返回数据的步骤。 

 6、同上一节课,本次不再介绍。

 7、子转换配置,将Get records from stream步骤拉到画布,然后填写Message、Topic两个字段,类型都是设置为String即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1613330.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sso-oauth2单点登录功能笔记

场景:最近公司2个系统需要做单点登录,A系统作为服务器,认证方式是sso-oauth2方式,B系统作为客户端,token方式是ta-token,先来张sso-oauth2认证方式的图 前置准备工作 第一步:要确认谁是服务提…

Python 全栈安全(一)

原文:annas-archive.org/md5/712ab41a4ed6036d0e8214d788514d6b 译者:飞龙 协议:CC BY-NC-SA 4.0 前言 序言 多年前,我在亚马逊搜索了一本基于 Python 的应用程序安全书。我以为会有多本书可供选择。已经有了很多其他主题的 Pyt…

【Linux】MySQL的安装及配置(Ubuntu-18.04)

一、安装MySQL 分别安装MySQL服务器、MySQL客户端、C/C开发库 sudo apt-get install mysql-server sudo apt-get install mysql-client sudo apt-get install libmysqlclient-dev 二、配置MySQL 1.查看默认配置文件,此处的user和password为默认提供的,…

vulfocus靶场thinkphp命令执行cve-2018-1002015

thinkPHP 5.0.x版本和5.1.x版本中存在远程代码执行漏洞,该漏洞源于ThinkPHP在获取控制器名时未对用户提交的参数进行严格的过滤。远程攻击者可通过输入‘\’字符的方式调用任意方法利用该漏洞执行代码 开启靶场: 使用工具: think…

SpringBoot-无法从static上下文引用同非static方法

1.问题 说明:无法从static上下文引用同非static方法。 2.解决 说明:return后面的语句中,调用的是变量的方法,而不是类型的方法!

ChatGPT研究论文提示词集合3-【数据收集】、【数据分析】和【解释与讨论】

点击下方▼▼▼▼链接直达AIPaperPass ! AIPaperPass - AI论文写作指导平台 目录 1.数据收集 2.数据分析 3.讨论与解释 4.书籍介绍 AIPaperPass智能论文写作平台 近期小编按照学术论文的流程,精心准备一套学术研究各个流程的提示词集合。总共14个步…

从零到一大屏开发过程记录

写在前面,博主是个在北京打拼的码农,凭借多年前端工作经验做过各类项目,最近心血来潮在这儿写点东西,欢迎大家多多指教。 对于文章中出现的任何错误请大家批评指出,一定及时修改。有任何想要讨论和学习的问题可联系我&…

Windows使用freeSSHd搭建sftp服务器

一、安装 1、运行freeSSHd.exe(最好以管理员方式运行) 2、选择安装位置 3、选择全部安装 4、是否创建开始启动栏快捷入口 5、是否创建桌面快捷方式 6、安装 7、安装完成,点击close 8、安装私钥 9、是否要安装为服务 10、全部安装完成 二、配…

强固型工业电脑在码头智能化,龙门吊/流机车载电脑的行业应用

码头智能化行业应用 对码头运营来说,如何优化集装箱从船上到码头堆场到出厂区的各个流程以及达到提高效率。 降低成本的目的,是码头营运获利最重要的议题。为了让集装箱码头客户能够安心使用TOS系统来调度指挥码头上各种吊车、叉车、拖车和人员&#xf…

Linux内核之slab、slub内存分配器实例用法区别(五十八)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…

从零实现诗词GPT大模型:实现Transformer架构

专栏规划: https://qibin.blog.csdn.net/article/details/137728228 首先说明一下,跟其他文章不太一样,在本篇文章中不会对Transformer架构中的自注意力机制进行讲解,而是后面单独1~2篇文章详细讲解自注意力机制,我认为由浅入深的先了解Transformer整体架构和其中比较简单…

Drive Scope for Mac:硬盘健康监测分析工具

Drive Scope for Mac是一款专为Mac用户设计的硬盘健康监测与分析工具,致力于保障用户的数据安全。这款软件功能强大且操作简便,能够实时检测硬盘的各项指标,帮助用户及时发现并解决潜在问题。 Drive Scope for Mac 1.2.23注册激活版下载 Driv…

在RISC-V64架构的CV1811C开发板上应用perf工具进行多线程程序性能分析及火焰图调试

CV1811C环境编译 SDK目录结构 . ├── build // 编译目录,存放编译脚本以及各board差异化配置 ├── buildroot-2021.05 // buildroot开源工具 ├── freertos // freertos系统 ├── fsbl // fsbl启动固件,prebuilt形式存在…

mklink 命令的使用(适用场景:C盘爆满,转移到其他盘)

一、背景 将Oracle数据库安装在D盘,由于磁盘爆满,需要将数据库转移到其他磁盘(如:J盘)。 在移动数据库之后,会出现数据库无法使用的情况,这时该如何解决?经了解,可以使用…

基于Hadoop的电商用户行为分析系统设计与实现的系统架构设计

采集层:利用Flume采集电商服务器端用户行为数据,把数据处理后发送至HDFS。 存储层:用户行为数据采集上传至HDFS存储, 导入到数据仓库Hive进行计算处理,分析结果保存至MySql数据库中。 计算层:根据分析需求建…

基于Java+SpringBoot+vue动物救助平台设计和实现

基于JavaSpringBootvue动物救助平台设计和实现 🍅 作者主页 央顺技术团队 🍅 欢迎点赞 👍 收藏 ⭐留言 📝 🍅 文末获取源码联系方式 📝 🍅 查看下方微信号获取联系方式 承接各种定制系统 &#…

rancher-rke2 修改--service-cluster-ip-range

一、场景 因为需要部署新版本的ingress-nginx,而部署ingress-nginx的时候需要使用hostnetowrk以及nodeport的端口为80和443,service-node-port-range 默认为30000开始,部署会报错。 二、产生修改的需求 1、api-servier的配置文件位置 默认是没有的&…

基于STM32的蓝牙小车的Proteus仿真(虚拟串口模拟)

文章目录 一、前言二、仿真图1.要求2.思路3.画图3.1 电源部分3.2 超声波测距部分3.3 电机驱动部分3.4 按键部分3.5 蓝牙部分3.6 显示屏部分3.7 整体 4.仿真5.软件 三、总结 一、前言 proteus本身并不支持蓝牙仿真,这里我采用虚拟串口的方式来模拟蓝牙控制。 这里给…

【PyTorch】torch.gather() 用法

gather常被用于image做mask的操作中,对哪些地方进行赋值0/1 API: torch.gather — PyTorch 2.2 documentation torch.gather(input, dim, index, outNone) → Tensor gather()的意义: 顾名思义,聚集、集合:gather…

java-单列集合List详解

一、List概述 ​​​​​​​List 接口继承自 Collection 接口。这意味着所有 List 类型的对象都是 Collection 类型的对象,它们共享 Collection 接口中定义的所有方法。 List集合的特点: 1、有序:存和取得元素顺序一致 2、有索引&#xf…