PiflowX组件-WriteToKafka

news2025/1/23 6:07:24

WriteToKafka组件

组件说明

将数据写入kafka。

计算引擎

flink

有界性

Streaming Append Mode

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”写入的topic名。注意不支持topic列表。test
schemaSCHEMA“”Kafka消息的schema信息。若不指定,将从上游输入数据推断。id:int,name:string,age:int
formatFORMAT“”Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”)用来序列化或反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。json
propertiesPROPERTIES“”Kafka source连接器其他配置

WriteToKafka示例配置

{
  "flow": {
    "name": "DataGenTest",
    "uuid": "1234",
    "stops": [
      {
        "uuid": "0000",
        "name": "DataGen1",
        "bundle": "cn.piflow.bundle.flink.common.DataGen",
        "properties": {
          "schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]",
          "count": "100",
          "ratio": "5"
        }
      },
      {
        "uuid": "1111",
        "name": "WriteToKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "test",
          "schema": "",
          "format": "json",
          "properties": "{}"
        }
      },
      {
        "uuid": "2222",
        "name": "ReadFromKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "test",
          "group": "test",
          "startup_mode": "earliest-offset",
          "schema": "id:int,name:string,age:int",
          "format": "json",
          "properties": "{}"
        }
      },
      {
        "uuid": "3333",
        "name": "ShowData1",
        "bundle": "cn.piflow.bundle.flink.common.ShowData",
        "properties": {
          "showNumber": "5000"
        }
      }
    ],
    "paths": [
      {
        "from": "DataGen1",
        "outport": "",
        "inport": "",
        "to": "WriteToKafka1"
      },
      {
        "from": "WriteToKafka1",
        "outport": "",
        "inport": "",
        "to": "ReadFromKafka1"
      },
      {
        "from": "ReadFromKafka1",
        "outport": "",
        "inport": "",
        "to": "ShowData1"
      }
    ]
  }
}
示例说明

本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。

字段描述
[
    {       
        "filedName": "id",
        "filedType": "INT",
        "kind": "sequence",
        "start": 1,
        "end": 10000
    },
        {       
        "filedName": "name",
        "filedType": "STRING",
        "kind": "random",
        "length": 15
    },
        {       
        "filedName": "age",
        "filedType": "INT",
        "kind": "random",
        "max": 100,
        "min": 1
    } 
]

1.id字段

id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.

2.name字段

name字段类型为STRING,使用random生成器,生成字符长度为15。

3.age字段

age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1344129.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

9种卷积注意力机制创新方法汇总,含2024最新

今天咱们来聊聊卷积注意力机制。 相信各位在写论文的时候都苦恼过怎么更好地改模型,怎么更高效地提高模型的性能和泛化能力吧?我的建议是,不妨考虑考虑卷积注意力。 卷积注意力机制是一种通过关注输入数据中的不同部分来改进模型性能的方法…

数据结构之树 --- 二叉树

目录 定义二叉树的结构体 二叉树的遍历 递归遍历 非递归遍历 链式二叉树的实现 二叉树的功能接口 先序遍历创建二叉树 后序遍历销毁二叉树 先序遍历查找树中值为x的节点 层序遍历 上篇我们对二叉树的顺序存储堆进行了讲述,本文我们来看链式二叉树。 定…

台式电源质量如何检测?纳米软件为您科普

一、外观检测 观察台式机电脑电源外观是否有损伤、烧焦,电源线是否有破损、短线的情况。观察电源的电压、电流、功率等参数,是否符合台式机电脑。 二、直观检测 开通电源,如果所有指示灯不亮,风扇没有声音,电源损坏的可…

yolov5 主要流程

1.介绍 本文包含了有关yolov5目标检测的基本流程,包括模型训练与模型部署,旨在帮助小伙伴们建立系统的认知💖💖 YOLO是 "You only look once "的首字母缩写,是一个开源软件工具,它具有实时检测…

Mysql高阶语句及存储过程

目录 空值(NULL) 和 无值() 的区别: 正则表达式: 存储过程: 创建存储过程: 存储过程的参数: 存储过程的控制语句: mysql高阶语句 case是 SQL 用来做为if,then,else 之类逻辑的…

php-fpm运行一段时间,内存不足

目录 一:原因分析 二:解决 三:观察系统情况 php-fpm运行一段时间,内存不足,是什么原因呢。 一:原因分析 1:首先php-fpm的配置 (1)启动的进程数 启动的进程数越多,占用内存越高; 2:其次…

Android studio CMakeLists.txt 打印的内容位置

最近在学习 cmake 就是在安卓中 , 麻烦的要死 , 看了很多的教程 , 发现没有 多少说对打印位置在哪里 , 先说一下版本信息 , 可能你们也不一样 gradle 配置 apply plugin: com.android.applicationandroid {compileSdkVersion 29buildToolsVersion "29.0.3"defau…

2023开发原子开放者大会:AI时代的前端开发,挑战与机遇并存

前言 12月16日,以“一切为了开发者”为主题的开放原子开发者大会在江苏省无锡市开幕。江苏省工业和信息化厅厅长朱爱勋、中国开源软件推进联盟主席陆首群等领导和专家参加开幕式,工业和信息化部信息技术发展司副司长王威伟、江苏省工业和信息化厅副厅长…

视频流媒体直播云服务管理平台EasyNVS长时间运行出现崩溃情况是什么原因?该如何解决?

EasyNVS云管理平台具备汇聚与管理EasyGBS、EasyNVR等平台的能力,可以将接入的视频资源实现统一的视频能力输出,支持远程可视化运维等管理功能,还能解决设备现场没有固定公网IP却需要在公网直播的需求。 有用户反馈,在长时间不间断…

虚拟机和电脑如何传送文件

一.桥接 (实现电脑和虚拟机在同一网段) 虚拟机上网盘设置 二.属性---文件共享设置 1打开属性,点击共享 2.添加共享人为全部人,并修改权限为读写模式 3.点击高级共享,选定此文件夹 4.点击网络和共享中心,划…

js实现前端下载图片和文件资料

说明:下载图片和文档资料是两种不同的方式,所以需要先判断下载的是图片还是word,excel等文件资料 目录 1.文件资料下载: 2.图片资源下载 1.文件资料下载: window.location.href 文件路径; handleClick(item) {let…

S32K312程序快速集成软件看门狗的方法

S32K312的软件看门狗配置比较复杂,如果靠纯手工在外设中进行配置,非常费时间,还不一定好用。 想要快速使用S32K312的软件看门狗,我探索一翻后做了总结: 1、先创建一个官方的示例代码工程(Wdg_Example_S32K…

世界经济论坛制定了五项指导原则,实现跨OT环境的网络安全。

内容概述: 世界经济论坛在其题为“解锁工业环境中的网络弹性:五项原则”的报告中列出:原则一:执行全面风险管理OT 环境、原则二:确保OT工程师和安装操作员对OT网络安全负责、原则三:与高层组织领导、战略规…

一文了解无线通信 - NB-IOT、LoRa

NB-IOT、LoRa 目录概述需求: 设计思路实现思路分析 NB-IOT1.LoRa2.区别 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,chall…

main 函数参数!它们有什么作用?

文章目录 1 主函数定义的标准方式2 为什么main函数需要参数?3 不写参数是否可以?4 两个参数有什么用?5 怎么用?6 总结 1 主函数定义的标准方式 int main (void) { body } //第一种 int main (int argc, char *argv[]) { body …

矩阵微分笔记(2)

目录 前言基本求导规则1. 向量变元的实值标量函数1.1 4个法则1.2 常用公式 2. 矩阵变元的实值标量函数2.1 4个法则2.2 常用公式 参考 前言 这篇笔记的内容是基于参考的文章写出的,公式部分可以会沿用文章本来的式,但会加入我自己的一些思考以及注释&…

Spring-6-事务管理

事务是构建可靠企业级应用程序的最关键部分之一。 最常见的事务类型是数据库操作。 在典型的数据库更新操作中,首先数据库事务开始,然后数据被更新,最后提交或回滚事务(根据数据库操作的结果而定)。但是,在很多情况下&#xff0…

【目标检测】yolov8结构及代码分析

yolov8代码:https://github.com/ultralytics/ultralytics yolov8的整体结构如下图(来自mmyolo): yolov8的配置文件: # Ultralytics YOLO 🚀, AGPL-3.0 license # YOLOv8 object detection model with P3-P5 outputs.…

main参数传递、反汇编、汇编混合编程

week03 一、main参数传递二、反汇编三、汇编混合编程 一、main参数传递 参考 http://www.cnblogs.com/rocedu/p/6766748.html#SECCLA 在Linux下完成“求命令行传入整数参数的和” 注意C中main: int main(int argc, char *argv[]), 字符串“12” 转为12,可以调用atoi…

简单了解SQL宽字节注入与httpXFF头注入(基于sqllabs演示)

1、宽字节注入 sqllabs-less-32为例 使用单引号进行测试 提示我们输入的单引号被转义符 \ 进行了转义,即转义符自动的出现在输入的特殊字符前面,这是防止sql注入的一种方法,导致无法产生报错。 这种情况我们就可以尝试宽字节注入&#xff…