PiflowX组件-WriteToUpsertKafka

news2025/1/15 20:07:34

WriteToUpsertKafka组件

组件说明

以upsert方式往Kafka topic中写数据。

计算引擎

flink

有界性

Streaming Upsert Mode

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:
"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。
"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
sink_parallelismSinkParallelism“”定义upsert-kafka sink算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
sink_buffer_flush_max_rowsSinkBufferFlushMaxRows“”缓存刷新前,最多能缓存多少条记录。当sink收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。 可以通过设置为 ‘0’ 来禁用它默认,该选项是未开启的。注意,如果要开启sink缓存,需要同时设置 ‘sink.buffer-flush.max-rows’ 和 'sink.buffer-flush.interval两个选项为大于零的值。
sink_buffer_flush_intervalSinkBufferFlushInterval“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
propertiesPROPERTIES“”Kafka source连接器其他配置

WriteToUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{
  "flow": {
    "name": "UpsertKafkaTest",
    "uuid": "1234",
    "stops": [
      {
        "uuid": "0000",
        "name": "JsonStringParser1",
        "bundle": "cn.piflow.bundle.flink.json.JsonStringParser",
        "properties": {
          "content": "[{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:32:24\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1201\",\"access_time\":\"2021-01-08 11:32:55\",\"dt\":\"2021-01-08\"},{\"user_id\":\"2\",\"client_ip\":\"192.165.12.1\",\"client_info\":\"pc\",\"page_code\":\"1031\",\"access_time\":\"2021-01-08 11:32:59\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1101\",\"access_time\":\"2021-01-08 11:33:24\",\"dt\":\"2021-01-08\"},{\"user_id\":\"3\",\"client_ip\":\"192.168.10.3\",\"client_info\":\"pc\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:33:30\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:34:24\",\"dt\":\"2021-01-08\"}]",
          "schema": "user_id:STRING,client_ip:STRING,client_info:STRING,page_code:STRING,access_time:TIMESTAMP,dt:STRING"
        }
      },
      {
        "uuid": "1111",
        "name": "WriteToKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "user_ip_pv",
          "tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":null,\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"user_id\",\"columnType\":\"STRING\",\"comment\":\"用户ID\"},{\"columnName\":\"client_ip\",\"columnType\":\"STRING\",\"comment\":\"客户端IP\"},{\"columnName\":\"client_info\",\"columnType\":\"STRING\",\"comment\":\"设备机型信息\"},{\"columnName\":\"page_code\",\"columnType\":\"STRING\",\"comment\":\"页面代码\"},{\"columnName\":\"access_time\",\"columnType\":\"TIMESTAMP\",\"comment\":\"请求时间\"},{\"columnName\":\"dt\",\"columnType\":\"STRING\",\"comment\":\"时间分区天\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null}",
          "format": "json",
          "properties": "{\"json.ignore-parse-errors\":\"true\"}"
        }
      },
      {
        "uuid": "2222",
        "name": "ReadFromKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "user_ip_pv",
          "group": "test",
          "startup_mode": "earliest-offset",
          "tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":\"source_ods_fact_user_ip_pv\",\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"user_id\",\"columnType\":\"STRING\",\"comment\":\"用户ID\"},{\"columnName\":\"client_ip\",\"columnType\":\"STRING\",\"comment\":\"客户端IP\"},{\"columnName\":\"client_info\",\"columnType\":\"STRING\",\"comment\":\"设备机型信息\"},{\"columnName\":\"page_code\",\"columnType\":\"STRING\",\"comment\":\"页面代码\"},{\"columnName\":\"access_time\",\"columnType\":\"TIMESTAMP\",\"comment\":\"请求时间\"},{\"columnName\":\"dt\",\"columnType\":\"STRING\",\"comment\":\"时间分区天\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null}",
          "format": "json",
          "properties": "{}"
        }
      },
      {
        "uuid": "3333",
        "name": "SQLExecute1",
        "bundle": "cn.piflow.bundle.flink.common.SQLExecute",
        "properties": {
          "sql": "CREATE VIEW view_total_pv_uv_min AS SELECT dt AS do_date, count(client_ip) AS pv, count(DISTINCT client_ip) AS uv,max(access_time) AS access_time FROM source_ods_fact_user_ip_pv GROUP BY dt;"
        }
      },
      {
        "uuid": "4444",
        "name": "WriteToUpsertKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.WriteToUpsertKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "result_total_pv_uv_min",
          "key_format": "json",
          "value_format": "json",
          "value_fields_include": "ALL",
          "tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":\"result_total_pv_uv_min\",\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null,\"asSelectStatement\":\"SELECT  do_date,cast(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,pv,uv,NOW() AS currenttime from view_total_pv_uv_min\"}",
          "properties": "{\"value.json.fail-on-missing-field\": false}"
        }
      }
    ],
    "paths": [
      {
        "from": "JsonStringParser1",
        "outport": "",
        "inport": "",
        "to": "WriteToKafka1"
      },
      {
        "from": "WriteToKafka1",
        "outport": "",
        "inport": "",
        "to": "ReadFromKafka1"
      },
      {
        "from": "ReadFromKafka1",
        "outport": "",
        "inport": "",
        "to": "SQLExecute1"
      },
      {
        "from": "SQLExecute1",
        "outport": "",
        "inport": "",
        "to": "WriteToUpsertKafka1"
      }
    ]
  }
}
示例说明
  1. 通过JsonStringParser将给定的json字符串解析,并输出到下游,通过WriteToKafka组件将数据写入到kafka的user_ip_pv topic中;

  2. 通过ReadFromKafka组件从user_ip_pv topic中读取数据;

  3. 使用SQLExecute组件执行创建视图view_total_pv_uv_min的语句;

  4. 使用WriteToUpsertKafka定义upsert kafka table,并使用tableDefinition属性中定义的asSelectStatement执行语句,将结果写入kafka。

tableDefinition属性结构
{
  "catalogName": null,
  "dbname": null,
  "tableName": "result_total_pv_uv_min",
  "ifNotExists": true,
  "physicalColumnDefinition": [
    {
      "columnName": "do_date",
      "columnType": "STRING",
      "nullable": false,
      "primaryKey": true,
      "partitionKey": false,
      "comment": "统计日期"
    },
    {
      "columnName": "do_min",
      "columnType": "STRING",
      "nullable": false,
      "primaryKey": true,
      "partitionKey": false,
      "comment": "统计分钟"
    },
    {
      "columnName": "pv",
      "columnType": "BIGINT",
      "nullable": false,
      "primaryKey": false,
      "partitionKey": false,
      "comment": "点击量"
    },
    {
      "columnName": "uv",
      "columnType": "BIGINT",
      "nullable": false,
      "primaryKey": false,
      "partitionKey": false,
      "comment": "一天内同个访客多次访问仅计算一个UV"
    },
    {
      "columnName": "currenttime",
      "columnType": "TIMESTAMP",
      "nullable": false,
      "primaryKey": false,
      "partitionKey": false,
      "comment": "当前时间"
    }
  ],
  "metadataColumnDefinition": null,
  "computedColumnDefinition": null,
  "watermarkDefinition": null,
  "asSelectStatement": "SELECT  do_date,cast(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,pv,uv,NOW() AS currenttime from view_total_pv_uv_min"
}

演示DEMO
在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1348993.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QT音频编程实战项目(一)ui搭建和槽函数的完成

第一个类实现播放音乐,第二个类实现歌曲列表,第三个类是播放本地歌曲 上边是歌曲的总时长,下边是当前播放的时长。 所需要的槽函数如上图。 这个是构造函数: …

基于JavaWeb实验室预约管理系统(源码+数据库+文档)

一、项目简介 本项目是一套基于JavaWeb实验室预约管理系统,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含:项目源码、数据库脚本等,该项目附带全部源码可作为毕设使用。 项目都经过严格调试,e…

OFDM——PAPR减小

文章目录 前言一、PAPR 减小二、MATLAB 仿真1、OFDM 信号的 CCDF①、MATLAB 源码②、仿真结果 2、单载波基带/通频带信号的 PAPR①、MATLAB 源码②、仿真结果 3、时域 OFDM 信号和幅度分布①、MATLAB 源码②、仿真结果 4、Chu 序列和 IEEE802.16e 前导的 PAPR①、MATLAB 源码②…

ToDesk Linux 客户端安装(欧拉系统)

下载链接 下载链接 https://newdl.todesk.com/linux/todesk-v4.3.1.0-x86_64.rpm (使用4.3.1.0覆盖安装后,临时密码将会变更) 安装命令: sudo rpm -Uvh todesk-v4.3.1.0-x86_64.rpm启动命令: todesk启动命令只能在桌…

算法通关村第二十关-白银挑战图的存储与遍历

大家好我是苏麟, 今天继续聊图 . 与前面的链表、树等相比,图的存储和遍历要复杂非常多 .所以理解就好 , 面试基本不会让写代码的 . 图的类型多、表示方式多,相关算法也很多,实现又过于复杂,多语言实现难度太大了。这些算法一般理…

Ultra ISO 虚拟光驱修改光盘盘符

windows xp 环境 ultra iso 虚拟光驱修改光盘盘符 method 1. 在ultra iso 中 [选项]->[配置]->[虚拟光驱],在新盘符里选指定盘符 ->[修改] method 2. 打开命令行,进入安装目录,如 "C:\Program Files\UltraISO\drivers"&…

浅析锂电池保护板(BMS)系统设计思路(四)SOC算法-扩展Kalman滤波算法

1 SOC估算方法介绍 电池SOC的估算是电池管理系统的核心,自从动力电池出现以来,各种各样的电池SOC估算方法不断出现。随着电池管理系统的逐渐升级,电池SOC估算方法的效率与精度不断提高,下面将介绍常用几种电池SOC估算方法[1]&…

2023年总结及2024年规划:我们结婚啦

目录 1、回首2023 1.1、生活方面 1.2、工作方面 1.3、学习方面 2、展望2024 2.1、生活方面 2.2、工作方面 2.3、学习方面 2023年最重要的事情当然是我们结婚啦! 1、回首2023 1.1、生活方面 今年五一假期,我和对象回老家在双方亲友的见证下完…

HarmonyOS 路由传参

本文 我们来说两个page界面间的数据传递 路由跳转 router.pushUrl 之前我们用了不少了 但是我们只用了它的第一个参数 url 其实他还有个params参数 我们第一个组件可以编写代码如下 import router from ohos.router Entry Component struct Index {build() {Row() {Column() …

ES6之Proxy详解

✨ 专栏介绍 在现代Web开发中,JavaScript已经成为了不可或缺的一部分。它不仅可以为网页增加交互性和动态性,还可以在后端开发中使用Node.js构建高效的服务器端应用程序。作为一种灵活且易学的脚本语言,JavaScript具有广泛的应用场景&#x…

虚拟专线网络(IP-VPN)

虚拟专线网络(IP-VPN),因为它的安全性和可靠性。通过亚洲领先的 IP VPN 提供商。享受更高的可管理性和可扩展性,在多个站点之间交付 IP 流量或数据包,拥有亚太地区最大的 IP 骨干网。 1,保证正常运行时间,在网络链路发…

设计模式设计原则——依赖倒置原则(DIP)

DIP:Dependence Inversion Principle 原始定义:High level modules should not depend upon low level modules. Both should depend upon abstractions. Abstractions should not depend upon details. Details should depend upon abstractions。 官…

合伙企业有哪些分类

合伙企业分为:普通合伙企业和有限合伙企业。其中,普通合伙企业又包含特殊的普通合伙企业。 1、普通合伙企业由2人以上普通合伙人(没有上限规定)组成。 普通合伙企业中,合伙人对合伙企业债务承担无限连带责任。 特殊的普通合伙企业中&#xf…

docker 在线安装mysql 8.0.21版本

1、拉取mysql 8.0.21版本镜像 2、查看镜像 docker images 3、在宿主机 /usr/local/mysql 下的 conf 文件夹下,创建 my.cnf 文件,并编辑内容 [mysql] default-character-setutf8 [client] port3306 default-character-setutf8 [mysqld] port3306 se…

【基础篇】五、类的双亲委派机制

文章目录 1、双亲委派机制2、Java代码中去主动加载一个类3、“父”加载器4、Q & A5、打破双亲委派机制 1、双亲委派机制 JVM中有多个类加载器,某个类A,到底该由谁去加载 ⇒ 双亲委派机制 该机制的作用: 保证类加载的安全性:避…

VMvare虚拟机中文件夹共享防火墙设置

目录 一、虚拟机jdk及tomcat配置 1.1 JDK配置 1.2 tomcat配置 二、文件夹共享 2.1 为什么需要配置文件夹共享功能 2.2 高级共享和普通共享 三、防火墙设置 入站规则和出站规则 四、思维导图 一、虚拟机jdk及tomcat配置 1.1 JDK配置 (1) 双击jdk (2&#xf…

个体诊所软件方案,农村医疗服务站社区门诊电子处方管理系统软件教程

个体诊所软件方案,农村医疗服务站社区门诊电子处方管理系统软件教程 一、软件程序问答 1、处方单软件有病历汇总吗 如下图,软件以 佳易王电子处方软件V17.2版本为例说明 点击 病历汇总统计 按钮, 可以按明细查询或病历汇总查询&#xf…

交通 | 司乘匹配:基于增量成本计算的优化算法

编者按: 司乘匹配是打车服务中一项至关重要的任务,如果这一步做得不够优化,可能导致乘客需要更长的时间才能到达目的地,同时司机的收入也会因此减少。由于司乘匹配是一个持续进行的过程,每一时刻都在不断涌入新的打车…

MD5算法

一、引言 MD5(Message-Digest Algorithm 5)是一种广泛应用的密码散列算法,由Ronald L. Rivest于1991年提出。MD5算法主要用于对任意长度的消息进行加密,将消息压缩成固定长度的摘要(通常为128位)。在密码学…

微信小程序开发系列-11组件间通信02

微信小程序开发系列目录 《微信小程序开发系列-01创建一个最小的小程序项目》 《微信小程序开发系列-02注册小程序》 《微信小程序开发系列-03全局配置中的“window”和“tabBar”》 《微信小程序开发系列-04获取用户图像和昵称》 《微信小程序开发系列-05登录小程序》 《…