PiflowX组件-ReadFromKafka

news2024/12/26 20:47:01

ReadFromKafka组件

组件说明

从kafka中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”读取数据的topic名。亦支持用分号间隔的topic列表,如 ‘topic-1;topic-2’。" "注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic-1
topic_patternTOPIC_PATTERN“”匹配读取topic名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的topic都将被Kafka consumer订阅。注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic1_*
startup_modeSTARTUP_MODE“”Set(“earliest-offset”, “latest-offset”, “group-offsets”, “timestamp”, “specific-offsets”)Kafka consumer 的启动模式。earliest-offset
schemaSCHEMA“”Kafka消息的schema信息。id:int,name:string,age:int
formatFORMAT“”Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”)用来反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。json
groupGROUP“”Kafka source的消费组id。如果未指定消费组ID,则会使用自动生成的"KafkaSource-{tableIdentifier}"作为消费组ID。group_1
propertiesPROPERTIES“”Kafka source连接器其他配置

ReadFromKafka示例配置

{
  "flow": {
    "name": "DataGenTest",
    "uuid": "1234",
    "stops": [
      {
        "uuid": "0000",
        "name": "DataGen1",
        "bundle": "cn.piflow.bundle.flink.common.DataGen",
        "properties": {
          "schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]",
          "count": "100",
          "ratio": "5"
        }
      },
      {
        "uuid": "1111",
        "name": "WriteToKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "test",
          "schema": "",
          "format": "json",
          "properties": "{}"
        }
      },
      {
        "uuid": "2222",
        "name": "ReadFromKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "test",
          "group": "test",
          "startup_mode": "earliest-offset",
          "schema": "id:int,name:string,age:int",
          "format": "json",
          "properties": "{}"
        }
      },
      {
        "uuid": "3333",
        "name": "ShowData1",
        "bundle": "cn.piflow.bundle.flink.common.ShowData",
        "properties": {
          "showNumber": "5000"
        }
      }
    ],
    "paths": [
      {
        "from": "DataGen1",
        "outport": "",
        "inport": "",
        "to": "WriteToKafka1"
      },
      {
        "from": "WriteToKafka1",
        "outport": "",
        "inport": "",
        "to": "ReadFromKafka1"
      },
      {
        "from": "ReadFromKafka1",
        "outport": "",
        "inport": "",
        "to": "ShowData1"
      }
    ]
  }
}
示例说明

本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。

字段描述
[
    {       
        "filedName": "id",
        "filedType": "INT",
        "kind": "sequence",
        "start": 1,
        "end": 10000
    },
        {       
        "filedName": "name",
        "filedType": "STRING",
        "kind": "random",
        "length": 15
    },
        {       
        "filedName": "age",
        "filedType": "INT",
        "kind": "random",
        "max": 100,
        "min": 1
    } 
]

1.id字段

id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.

2.name字段

name字段类型为STRING,使用random生成器,生成字符长度为15。

3.age字段

age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1343616.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

腾讯云标准型S5服务器4核8G配置优惠价格表

腾讯云4核8G服务器S5和轻量应用服务器优惠价格表,轻量应用服务器和CVM云服务器均有活动,云服务器CVM标准型S5实例4核8G配置价格15个月1437.3元,5年6490.44元,轻量应用服务器4核8G12M带宽一年446元、529元15个月,腾讯云…

malloc、calloc、realloc、free函数的使用及注意事项

malloc函数 malloc函数的返回值为void*类型 内存管理函数操作的内存是在堆区空间 malloc函数使用示例 free(p)相当于值传递,不能改变p本身。 free只是释放了空间,释放后p依然指向原地址,故需要手动置NULL。 calloc函数 calloc可以指定开辟n个…

【Week-P3】CNN天气识别

文章目录 一、环境配置二、准备数据三、搭建网络结构四、开始训练五、查看训练结果六、总结6.1 不改变学习率的前提下,将训练epoch分别增加到50、60、70、80、90(1)epoch 50 的训练情况如下:(2)epoch 60 …

UE4运用C++和框架开发坦克大战教程笔记(十二)(第37~39集)

UE4运用C和框架开发坦克大战教程笔记(十二)(第37~39集) 37. 延时事件系统38. 协程逻辑优化更新39. 普通按键绑定 37. 延时事件系统 由于梁迪老师是写 Unity 游戏出身的,所以即便 UE4 有自带的 TimeManager 这样的延时…

直方图与均衡化

直方图 统计图像中相同像素点的数量。 使用cv2.calcHist(images, channels, mask, histSize, ranges)函数 images:原图像图像格式为uint8或float32,当传入函数时应用[]括起来,例如[img]。 channels:同样用中括号括起来&#xff…

pytest pytest-html优化样式

conftest.py import pytest from pytest_metadata.plugin import metadata_keydef pytest_html_report_title(report):report.title"接口测试报告"def pytest_configure(config):# 获取命令行参数中的测试环境、测试版本、开始时间、测试人员config.stash[metadata_…

鸿蒙Harmony(七)ArkUI--循环foreachList组件自定义组件

循环foreach import Prompt from system.promptclass Item {icon: Resourcename: stringprice: numberconstructor(icon: Resource, name: string, price: number) {this.icon iconthis.name namethis.price price} }Entry Component struct Index {State message: string …

Linux 安装Jupyter notebook 并开启远程访问

文章目录 安装Python安装pip安装Jupyter启动Jupyter Notebook1. 生成配置文件2. 创建密码3. 修改jupyter notebook的配置文件4. 启动jupyter notebook5. 远程访问jupyter notebook 安装Python 确保你的系统上已经安装了Python。大多数Linux发行版都预装了Python。你可以在终端…

flutter 之proto

和嵌入式用proto协议来通信,以mac来演示 先在电脑上安装protobuf(在博主文章内容里面搜Mac安装protobuf),然后在桌面上放这几个文件,且build_proto_dart.sh文件内容如图所示 #!/bin/bashSCRIPT$(readlink -f "$0…

NFC物联网智能学生宿舍系统设计方案

随着物联网技术的不断发展,智慧城市、智能家居、智慧校园的建设也在如火如茶地进行。本文结合物联网发展过程中相关的技术,应用到智慧校园的建设过程中,将学生宿舍打造成舒适、安全的集体空间,该系统可以对学生宿舍实现智能开门、…

Python爬虫教程30:Selenium网页元素,定位的8种方法!

Selenium可以驱动浏览器,完成各种网页浏览器的模拟操作,比如模拟点击等。要想操作一个元素,首先应该识别这个元素。人有各种的特征(属性),我们可以通过其特征找到人,如通过身份证号、姓名、家庭…

云短信平台优惠活动 - 华为OD统一考试

OD统一考试 题解: Java / Python / C++ 题目描述 某云短信厂商,为庆祝国庆,推出充值优惠活动。 现在给出客户预算,和优惠售价序列,求最多可获得的短信总条数。 输入描述 第一行客户预算M,其中 0<=M<=100 第二行给出售价表,P1,P2,… Pn, 其中 1<=n<=100…

下载和安装AD14 - Altium Designer 14.3.20.54863

这个版本应该还支持XP 系统[doge]&#xff0c;总之就是想安装一下&#xff0c;没什么特别的意义。 下载 资源来自毛子网站&#xff1a;https://rutracker.net/forum/viewtopic.php?t5140739&#xff0c;带上个网页翻译插件就行。要用磁力链接下载&#xff0c;推荐用qbittorr…

一篇文章深入认识微服务SpringCloud和Dubbo的区别

1、SpringCloud是什么 SpringCloud, 基于SpringBoot提供了一套微服务解决方案&#xff0c;包括服务注册与发现&#xff0c;配置中心&#xff0c;全链路监控&#xff0c;服务网关&#xff0c;负载均衡&#xff0c;熔断器等组件&#xff0c;除了基于NetFlix的开源组件做高度抽象…

沙特电子签证照片尺寸要求及手机自拍制作方法介绍

Hey小伙伴们&#xff0c;准备去沙特阿拉伯旅行的朋友们注意啦&#xff01;沙特驻华大使馆对签证所需照片是有要求的&#xff0c;今天我要分享给大家的是关于沙特签证照片的尺寸和拍摄要求&#xff0c;让你的签证申请过程更加顺利哦&#xff01;此外&#xff0c;也教大家一种在家…

maven命令行安装依赖测试

mvn dependency:get -DgroupIdorg.springframework -DartifactIdspring-core -Dversion5.3.9作用&#xff1a;可用于测试配置环境变量后&#xff0c;能否下载依赖到本地仓库

基于JAVA+SSM+VUE的前后端分离的大学竞赛管理系统

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; 随着互联网技术的快速…

初识Sringboot3+vue3环境准备

环境准备 后端环境准备 下载JDK17https://www.oracle.com/java/technologies/downloads/#jdk17-windows 安装就下一步下一步,选择安装路径 配置环境 环境 JDK17、IDEA2021、maven3.5、vscode 后端 基础&#xff1a;javaSE&#xff0c;javaWeb、JDBC、SMM框架&#xff08;Spr…

如何使用idea部署springboot项目全过程

博主介绍&#xff1a; ✌至今服务客户已经1000、专注于Java技术领域、项目定制、技术答疑、开发工具、毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精彩专栏 推荐订阅 &#x1f447;&#x1f3fb; 不然下次找不到 Java项目精品实…

0开始配置Cartographer建图和导航定位

0开始配置Cartographer 日期&#xff1a;12-19 硬件&#xff1a;激光雷达IMU 小车的tf变换&#xff1a; 建图配置 lua文件配置&#xff1a;my_robot.lua include "map_builder.lua" include "trajectory_builder.lua"options {map_builder MAP_BUILDE…