ClickHouse(二十四):Flink 写入 ClickHouse API

news2024/10/6 6:49:52

 

进入正文前,感谢宝子们订阅专题、点赞、评论、收藏!关注IT贫道,获取高质量博客内容!

🏡个人主页:含各种IT体系技术,IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客

📌订阅:拥抱独家专题,你的订阅将点燃我的创作热情!

👍点赞:赞同优秀创作,你的点赞是对我创作最大的认可!

⭐️ 收藏:收藏原创博文,让我们一起打造IT界的荣耀与辉煌!

✏️评论:留下心声墨迹,你的评论将是我努力改进的方向!


目录


可以通过Flink原生JDBC Connector包将Flink结果写入ClickHouse中,Flink在1.11.0版本对其JDBC Connnector进行了重构:

  • 重构之前(1.10.x 及之前版本),包名为 flink-jdbc 。
  • 重构之后(1.11.x 及之后版本),包名为 flink-connector-jdbc 。

二者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:

API名称

flink-jdbc

flink-connector-jdbc

DataStream

不支持

支持

Table API

支持

不支持

​​​​​​​​​​​​​​1. Flink 1.10.x之前版本使用flink-jdbc,只支持Table API

  • 示例:

1)  maven中需要导入以下包:

<!--添加 Flink Table API 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.9.1</version>
</dependency>

<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>

2) 代码:

/**
  *  通过 flink-jdbc API 将 Flink 数据结果写入到ClickHouse中,只支持Table API
  *
  *  注意:
  *   1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
  *   2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。
  */

case class PersonInfo(id:Int,name:String,age:Int)

object FlinkWriteToClickHouse1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为1,后期每个并行度满批次需要的条数时,会插入click中
    env.setParallelism(1)
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //读取Socket中的数据
    val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
    val ds: DataStream[PersonInfo] = sourceDS.map(line => {
      val arr: Array[String] = line.split(",")
      PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
    })

    //将 ds 转换成 table 对象
    import org.apache.flink.table.api.scala._
    val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)

    //将table 对象写入ClickHouse中
    //需要在ClickHouse中创建表:create table flink_result(id Int,name String,age Int) engine = MergeTree() order by id;
    val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"

    //准备ClickHouse table sink
    val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
      .setDBUrl("jdbc:clickhouse://node1:8123/default")
      .setUsername("default")
      .setPassword("")
      .setQuery(insertIntoCkSql)
      .setBatchSize(2) //设置批次量,默认5000条
      .setParameterTypes(Types.INT, Types.STRING, Types.INT)
      .build()

    //注册ClickHouse table Sink,设置sink 数据的字段及Schema信息
    tableEnv.registerTableSink("ck-sink",
      sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))

    //将数据插入到 ClickHouse Sink 中
    tableEnv.insertInto(table,"ck-sink")

    //触发以上执行
    env.execute("Flink Table API to ClickHouse Example")

  }
}

2. Flink 1.11.x之后版本使用flink-connector-jdbc,只支持DataStream API

  • 示例:

1) 在Maven中导入以下依赖包

<!-- Flink1.11 后需要 Flink-client包-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.11.3</version>
</dependency>
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>

2) 代码

/**
  *  Flink 通过 flink-connector-jdbc 将数据写入ClickHouse ,目前只支持DataStream API
  */
object FlinkWriteToClickHouse2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度为1
    env.setParallelism(1)
    import org.apache.flink.streaming.api.scala._

    val ds: DataStream[String] = env.socketTextStream("node5",9999)

    val result: DataStream[(Int, String, Int)] = ds.map(line => {
      val arr: Array[String] = line.split(",")
      (arr(0).toInt, arr(1), arr(2).toInt)
    })

    //准备向ClickHouse中插入数据的sql
    val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"

    //设置ClickHouse Sink
    val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
      //插入数据SQL
      insetIntoCkSql,

      //设置插入ClickHouse数据的参数
      new JdbcStatementBuilder[(Int, String, Int)] {
        override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
          ps.setInt(1, tp._1)
          ps.setString(2, tp._2)
          ps.setInt(3, tp._3)
        }
      },
      //设置批次插入数据
      new JdbcExecutionOptions.Builder().withBatchSize(5).build(),

      //设置连接ClickHouse的配置
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
        .withUrl("jdbc:clickhouse://node1:8123/default")
        .withUsername("default")
        .withUsername("")
        .build()
    )

    //针对数据加入sink
    result.addSink(ckSink)

    env.execute("Flink DataStream to ClickHouse Example")

  }
}

👨‍💻如需博文中的资料请私信博主。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/910582.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

光伏发电+boost+储能+双向dcdc+并网逆变器控制(低压用户型电能路由器仿真模型)【含个人笔记+建模参考】

MATALB代码链接&#xff1a;光伏发电boost十储能十双向dcdc十并网逆变器 个人笔记与建模参考请私信发送 包含Boost、Buck-boost双向DCDC、并网逆变器三大控制部分 boost电路应用mppt&#xff0c; 采用扰动观察法实现光能最大功率点跟踪 电流环的逆变器控制策略 双向dcdc储能系…

酷开会员 | 酷开系统给孩子更好的选择

暑假到来&#xff0c;很多家长对孩子看电视的行为感到无力&#xff1a;孩子放假在家一天到晚就对着电视&#xff0c;作业不拖到最后一刻绝不写&#xff01; 孩子早上醒来就吵着看电视&#xff0c;一看就收不住&#xff0c;不吃不喝的。家长则每天都在和孩子斗智斗勇&#xff0…

Pandas学习(完成文件写入、追加写入、读取操作)

问题引入 现在有这么一个需求 我要对我的很多设备进行快照处理&#xff0c;打完快照之后需要记录我的设备IP和快照时间 当我们解决了需求的其他内容&#xff0c;只剩记录信息的时候&#xff0c;可以怎么做呢 这时候就可以引入我们的pandas模块啦&#xff0c;它对数据进行一系列…

Docker常用操作命令(一)

Docker常用操作命令 1、搜索镜像 docker search命令搜索存放在 Docker Hub中的镜像,此命令默认Docker会在Docker Hub中搜索镜像&#xff0c;可以配置了其他镜像仓库 [rootzch01 ~]# docker search centos NAME:镜像仓库名称DESCRIPTION:镜像仓库描述STARS&#xff1a;镜像仓…

spring cloud 之 dubbo nacos整合

整体思路&#xff1a; 搭建本地nacos服务&#xff0c;详见docker安装nacos_xgjj68163的博客-CSDN博客 共三个工程&#xff0c;生产者服务、消费者服务、生产者和消费者共同依赖的接口工程&#xff08;打成jar&#xff0c;供生产者和消费者依赖&#xff09;&#xff1b; …

【面试题】前端面试复习6---性能优化

前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 性能优化 一、性能指标 要在 Chrome 中查看性能指标&#xff0c;可以按照以下步骤操作&#xff1a; 打开 Chrome 浏览器&#xff0c;并访问你想要测试…

【 欧凯 网页 test】

骨钙素&#xff08;BGP&#xff09; 抗体参数 名称抗人骨钙素抗体&#xff08;BGP antibody&#xff09;应用平台免疫荧光&#xff0c;化学发光货号K135c2K131c1推荐用途捕获检测来源鼠单抗&#xff0c;体外培养获得缓冲液1PBS纯度Protein A/G纯化&#xff0c;纯度>96%储存…

结构型(五) - 适配器模式

一、概念 适配器模式&#xff08;Adapter Pattern&#xff09;&#xff1a;这个模式就是用来做适配的&#xff0c;它将不兼容的接口转换为可兼容的接口&#xff0c;让原本由于接口不兼容而不能一起工作的类可以一起工作。 应用场景&#xff1a;适配器模式是一种事后的补救策略…

分布式事务(4):两阶段提交协议与三阶段提交区别

1 两阶段提交协议 两阶段提交方案应用非常广泛&#xff0c;几乎所有商业OLTP数据库都支持XA协议。但是两阶段提交方案锁定资源时间长&#xff0c;对性能影响很大&#xff0c;基本不适合解决微服务事务问题。 缺点&#xff1a; 如果协调者宕机&#xff0c;参与者没有协调者指…

通过springBoot自动装配实现api封装

1.在resource目录下创建META-INF目录&#xff0c;并在其中创建resources\META-INF\spring.factories org.springframework.boot.autoconfigure.EnableAutoConfiguration\com.tanhua.autoconfig.TanhuaAutoConfiguration springBoot在启动之后会自动扫描这个文件&#xff0c;并…

SAP LTMC基础教程之物料主数据详细操作示例

SAP LTMC基础教程之物料主数据详细操作示例 SAP S/4HANA 1610版本的推出已经不再建议使用LSMW了&#xff0c;使用中会受到很多限制&#xff08;比如特性、类的导入&#xff09;&#xff0c;而是推出了新工具LTMC。记录并分享LTMC的操作。 有几个注意点能够搞明白基本都能成功…

爱校对如何帮助企业和博客主提高在线可见性?

在数字化时代&#xff0c;内容质量已经成为增强在线曝光率的关键因素。企业和博客主经常面临挑战&#xff0c;如何制作高质量、无误的内容以吸引更多的在线用户。此文将详细分析“爱校对”如何帮助用户优化内容&#xff0c;从而提高在线可见性。 1.互联网内容的挑战 搜索引擎…

git介绍+集成到IDEA中+使用gitee

目录 git介绍 本地工作流程 IDEA集git 添加到暂存区 添加到本地仓库 gitee使用 添加到远程仓库 git介绍 git是一个开源的分布式版本控制工具&#xff0c;效率高。可以记录历史代码&#xff0c;多人代码共享 知识小点&#xff1a; 集中式版本控制&#xff1a;使用中央存…

在浏览器中打包 TypeScript 系列2:在浏览器中打包 TypeScript

原文地址 这是“在浏览器中打包 TypeScript 系列”的第 2 部分。 第 1 部分&#xff1a;ES 模块和导入映射import maps 打包和转译( Bundling & Transpiling ) 毫无疑问&#xff0c;打包和转译对于 Web 开发至关重要。在深入讨论该主题之前&#xff0c;让我们重申一下什…

Java动态代理、反射

文章目录 动态代理调用者--->代理--->对象为什么需要代理代理的详细实现过程代码详情 反射反射概念反射中常用的方法所有代码 动态代理 调用者—>代理—>对象 动态代理就是无侵入式的给代码增加新的功能&#xff0c;通过接口保证后面的对象和代理需要实现同一个接…

Kubernetes教程—查看 Pod 和节点

目标 了解 Kubernetes Pod。了解 Kubernetes 节点。对已部署的应用故障排除。 Kubernetes Pod 在模块 2 中创建 Deployment 时, Kubernetes 创建了一个 Pod 来托管你的应用实例。Pod 是 Kubernetes 抽象出来的&#xff0c; 表示一组一个或多个应用容器&#xff08;如 Docker…

Nexus2迁移升级到Nexus3

与 Nexus 2.x 相比&#xff0c;Nexus 3.x 为我们提供了更多实用的新特性。SonaType 官方建议我们&#xff0c;使用最新版本 Nexus 2.x 升级到最新版本 Nexus 3.x&#xff0c;并在 Nexus 升级兼容性 一文中为我们提供了各个版本 Nexus 升级到最新版本 Nexus 3.x 的流程&#xff…

opencv如何调用YOLOv5(无pytorch)

目录 一、前言 二.正文 2.1定义颜色 2.2目标检测主代码详解 2.3读取视频or图片进行检测 注意&#xff1a;opencv-python 本文使用的版本为4.5.2.52 一、前言 YOLO系列是one-stage且是基于深度学习的回归方法&#xff0c;而R-CNN、Fast-RCNN、Faster-RCNN等是two-stage且…

情人节特别定制:多种语言编写动态爱心网页(附完整代码)

写在前面案例1&#xff1a;HTML Three.js库案例2&#xff1a;HTML CSS JavaScript案例3&#xff1a;Python环境 Flask框架结语 写在前面 随着七夕节的临近&#xff0c;许多人都在寻找独特而令人难忘的方式来表达爱意。在这个数字时代&#xff0c;结合创意和技术&#xff0…

maven 从官网下载指定版本

1. 进入官网下载页面 Maven – Download Apache Maven 点击下图所示链接 2. 进入文件页&#xff0c;选择需要的版本 3. 选binaries 4. 选文件&#xff0c;下载即可