PiflowX组件-ReadFromUpsertKafka

news2025/1/12 16:44:41

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分反序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
propertiesPROPERTIES“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{
  "flow": {
    "name": "ReadFromUpsertKafkaTest",
    "uuid": "1234",
    "stops": [
      {
        "uuid": "5555",
        "name": "ReadFromUpsertKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "result_total_pv_uv_min",
          "key_format": "json",
          "value_format": "json",
          "value_fields_include": "ALL",
          "tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}",
          "properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"
        }
      },
      {
        "uuid": "6666",
        "name": "ShowChangeLogData1",
        "bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData",
        "properties": {
          "showNumber": "5000"
        }
      }
    ],
    "paths": [
      {
        "from": "ReadFromUpsertKafka1",
        "outport": "",
        "inport": "",
        "to": "ShowChangeLogData1"
      }
    ]
  }
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{
    "ifNotExists": true,
    "physicalColumnDefinition": [{
        "columnName": "do_date",
        "columnType": "STRING",
        "nullable": false,
        "primaryKey": true,
        "partitionKey": false,
        "comment": "统计日期"
    }, {
        "columnName": "do_min",
        "columnType": "STRING",
        "nullable": false,
        "primaryKey": true,
        "partitionKey": false,
        "comment": "统计分钟"
    }, {
        "columnName": "pv",
        "columnType": "BIGINT",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "点击量"
    }, {
        "columnName": "uv",
        "columnType": "BIGINT",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "一天内同个访客多次访问仅计算一个UV"
    }, {
        "columnName": "currenttime",
        "columnType": "TIMESTAMP",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "当前时间"
    }],
    "metadataColumnDefinition": null,
    "computedColumnDefinition": null
}

演示DEMO

在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1349264.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java技术专题】「入门到精通系列」深入探索Java技术中常用到的六种加密技术和代码

深入探索Java技术中常用到的六种加密技术和实现 背景介绍柯克霍夫原则加密机制加密类型密码学原则 加密常用代表组件加密算法介绍Base64算法消息摘要算法(Message Digest)数据指纹MD5MD5算法的工作原理 SHASHA工作原理 对称加密DESDES的原理分析 3DES3DE…

性能优化(CPU优化技术)-ARM Neon详细介绍

本文主要介绍ARM Neon技术,包括SIMD技术、SIMT、ARM Neon的指令、寄存器、意图为读者提供对ARM Neon的一个整体理解。 🎬个人简介:一个全栈工程师的升级之路! 📋个人专栏:高性能(HPC&#xff09…

VMware虚拟机和Centos7镜像安装

文章目录 安装VMware虚拟机1、下载2、激活 安装Centos7镜像启动虚拟机 安装VMware虚拟机 1、下载 建议还是安装16版本 VMware16下载 https://www.123pan.com/s/HQeA-aX1Sh VMware15 链接:https://pan.baidu.com/s/11UD1hb6IydbxNNPxmh-MqA?pwd0630 提取码&am…

2022年全国职业院校技能大赛(高职组)“云计算”赛项赛卷①第一场次:私有云

2022年全国职业院校技能大赛(高职组) “云计算”赛项赛卷1 第一场次:私有云(30分) 目录 2022年全国职业院校技能大赛(高职组) “云计算”赛项赛卷1 第一场次:私有云&#xff0…

DDC和PLC的区别

前言 PLC与DDC控制器的比较,一直以来在相关领域内受到广泛关注。每个人站在不同的角度分析,都会有不同的结论,我们今天聊聊这个话题。 基本定义和功能 可编程控制器PLC与直接数字控制器DDC,两者都由CPU模块、I/O模块、显示模块…

张量操作与线性回归

一、张量的操作:拼接、切分、索引和变换 (1)张量拼接与切分 1.1 torch.cat() 功能:将张量按维度dim进行拼接 • tensors: 张量序列 • dim : 要拼接的维度 torch.cat(tensors, dim0, outNone)函数用于沿着指定维度dim将多个张量…

CGAL的AABB tree

1、介绍 AABB树组件提供了一种静态数据结构和算法,用于对有限的三维几何对象集进行高效的交集和距离查询。可以查询数据结构中存储的几何对象集,以进行交集检测、交集计算和距离计算。 交集查询可以是任何类型的,只要在traits类中实现了相应的…

2024.1.1 hive_sql 题目练习,开窗,行列转换

重点知识: 在使用group by时,select之后的字段要么包含在聚合函数里,要么在group by 之后 进行行转列,行转列的核心就是使用concat_ws函数拼接(分隔符,内容), -- 以及collect_list函数进行收集,list不去重, set去重无序 列转行,核心就是使用炸裂函数把东…

DSL查询语法和RestClient查询文档

目录 DSL查询语法 DLS Query的分类 DSL Query基本语法 全文检索查询 精准查询 地理查询 复合查询 Function Score Query 复合查询 Boolean Query 搜索结果处理 排序 分页 分页 深度分页问题 深度分也解决方案 高亮 RestClient查询文档 快速入门 全文检索查…

将学习自动化测试时的医药管理信息系统项目用idea运行

将学习自动化测试时的医药管理信息系统项目用idea运行 背景 学习自动化测试的时候老师的运行方式是把医药管理信息系统项目打包成war包后再放到tomcat的webapp中去运行,于是我想着用idea运行会方便点,现在记录下步骤方便以后查找最开始没有查阅资料&am…

【心得】PHP反序列化高级利用(phar|session)个人笔记

目录 ①phar反序列化 ②session反序列化 ①phar反序列化 phar 认为是java的jar包 calc.exe phar能干什么 多个php合并为独立压缩包,不解压就能执行里面的php文件,支持web服务器和命令行 phar协议 phar://xxx.phar $phar->setmetadata($h); m…

LanceDB:在对抗数据复杂性战役中,您可信赖的坐骑

LanceDB 建立在 Lance(一种开源列式数据格式)之上,具有一些有趣的功能,使其对 AI/ML 具有吸引力。例如,LanceDB 支持显式和隐式矢量化,能够处理各种数据类型。LanceDB 与 PyTorch 和 TensorFlow 等领先的 M…

三菱人机交互GT Designer的使用(三,指示灯,数值显示与输入,字符串显示与输入,日期|时间的显示)

今天继续对GT进行学习,如有不妥,欢迎指正!!! 目录 指示灯设置 设置指示灯 位指示灯 字指示灯 数值输入,输出(二者差距不大) 数值显示与输出 数值显示(只能显示&…

【Maven】工程依赖下载失败错误解决

在使用 Maven 构建项目时,可能会发生依赖项下载错误的情况,主要原因有以下几种: 下载依赖时出现网络故障或仓库服务器宕机等原因,导致无法连接至 Maven 仓库,从而无法下载依赖。 依赖项的版本号或配置文件中的版本号错…

【计算机毕业设计】ssm+mysql+jsp实现的在线bbs论坛系统源码

项目介绍 jspssm(springspringMVCmybatis)MySQL实现的在线bbs论坛系统源码,本系统主要实现了前台用户注册登陆、浏览帖子、发布帖子、个人信息管理、消息通知管理,积分管理,后台管理功能有:友情链接管理、…

怎么设计一个简单又直观的接口?

文章目录 问题的开端为什么从问题开始?自然而来的接口 一个接口一件事情减少依赖关系使用方式要“傻” 小结 开放的接口规范是使用者和实现者之间的合约。既然是合约,就要成文、清楚、稳定。合约是好东西,它可以让代码之间的组合有规可依。但…

Stable Diffusion API入门:简明教程

Stable Diffusion 是一个先进的深度学习模型,用于创造和修改图像。这个模型能够基于文本描述来生成图像,让机器理解和实现用户的创意。使用这项技术的关键在于掌握其 API,通过编程来操控图像生成的过程。 在探索 Stable Diffusion API 的世界…

爱思唯尔的KBS——模板、投稿、返修、接收的总结

第二篇论文终于是接受了QAQ,被审稿人疯狂拖时间,KBS是真难绷啊 由于之前发布过关于爱思唯尔旗下的ESWA博客,KBS和ESWA是类似的,因此本篇博客主要说下区别以及期间碰到的各种情况,有疑问依然可以在评论区说,…

【C语言】函数

函数是什么? “函数”是我们早些年在学习数学的过程中常见的概念,简单回顾一下:比如下图中,你给函数 f(x)2*x3 一个具体的x,这个函数通过一系列的计算来返回给你一个结果(图示如下)。 这就是数学中函数的基本过程和作用。但是你…

48、激活函数 - 梯度消失和梯度爆炸

简单介绍下梯度消失和梯度爆炸,这个不是重点,但是我觉得有必要再深入了解这个概念,以及很多激活函数为什么是可以防止梯度消失的。 梯度消失和梯度爆炸实际上是在神经网络训练过程中经常会遇到的两类问题,这两类问题都与梯度有关。 什么是梯度 在神经网络训练中,梯度是指…