ClickHouse--11--ClickHouse API操作

news2025/1/13 13:39:20

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 1.Java 读写 ClickHouse API
    • 1.1 首先需要加入 maven 依赖
    • 1.2 Java 读取 ClickHouse 集群表数据
        • JDBC--01--简介
      • ClickHouse java代码
    • 1.3 Java 向 ClickHouse 表中写入数据
  • 2.Spark 写入 ClickHouse API
    • 2.1 导入依赖
    • 2.2 代码编写
  • 3.Flink 写入 ClickHouse API
    • 3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API
    • 3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API


1.Java 读写 ClickHouse API

1.1 首先需要加入 maven 依赖

<!-- 连接 ClickHouse 需要驱动包-->
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
     <artifactId>clickhouse-jdbc</artifactId>
     <version>0.2.4</version>
</dependency>

1.2 Java 读取 ClickHouse 集群表数据

JDBC–01–简介

在这里插入图片描述

public class Test01 {

    public static void main(String[] args) throws Exception {
        //1.注册数据库驱动
        Class.forName("com.mysql.jdbc.Driver");
        //2.获取数据库连接
        Connection conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/jt_db?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8",
                "root", "root");

        //3.获取传输器
        Statement stat = conn.createStatement();
        //4.发送SQL到服务器执行并返回执行结果
        String sql = "select * from account";
        ResultSet rs = stat.executeQuery( sql );
        //5.处理结果
        while( rs.next() ) {
            int id = rs.getInt("id");
            String name = rs.getString("name");
            double money = rs.getDouble("money");
            System.out.println(id+" : "+name+" : "+money);
        }
        //6.释放资源
        rs.close();
        stat.close();
        conn.close();
        System.out.println("TestJdbc.main()....");
    }

}
                                            

ClickHouse java代码


import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.ResultSet;
import java.sql.SQLException;

public class test01 {
    public static void main(String[] args) throws SQLException {
        ClickHouseProperties props = new ClickHouseProperties();
        props.setUser("default");
        props.setPassword("");
        //1.注册数据库驱动配置
        BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123,node2:8123,node3:8123/default", props);
        //2.获取数据库连接
        ClickHouseConnection conn = dataSource.getConnection();
        //3.获取传输器
        ClickHouseStatement statement = conn.createStatement();
        //4.发送SQL到服务器执行并
        ResultSet rs = statement.executeQuery("select id,name,age from test");
        //5.处理结果
        while (rs.next()) {
            int id = rs.getInt("id");
            String name = rs.getString("name");
            int age = rs.getInt("age");
            System.out.println("id = " + id + ",name = " + name + ",age = " + age);
        }

        //6.释放资源
        conn.close();
        statement.close();
        rs.close();
    }
}

1.3 Java 向 ClickHouse 表中写入数据

package com.cy.demo;

import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.sql.ResultSet;
import java.sql.SQLException;

public class test01 {
    public static void main(String[] args) throws SQLException {
        ClickHouseProperties props = new ClickHouseProperties();
        props.setUser("default");
        props.setPassword("");
        //1.注册数据库驱动配置
        BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123/default", props);
        //2.获取数据库连接
        ClickHouseConnection conn = dataSource.getConnection();
        //3.获取传输器
        ClickHouseStatement statement = conn.createStatement();
        //4.发送SQL到服务器执行并
        statement.execute("insert into test values (100,'王五',30)");//可以拼接批量插入多条
       
        //6.释放资源
        conn.close();
        statement.close();
        rs.close();
    }
}

在这里插入图片描述

2.Spark 写入 ClickHouse API

  • SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse对应的表中。在 ClickHouse 中需要预先创建好对应的结果表

2.1 导入依赖

        <!-- 连接 ClickHouse 需要驱动包-->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
            <!-- 去除与 Spark 冲突的包 -->
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- Spark-core -->
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL ON Hive-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>

2.2 代码编写

val session: SparkSession =
SparkSession.builder().master("local").appName("test").getOrCreate()
val jsonList = List[String](
"{\"id\":1,\"name\":\"张三\",\"age\":18}",
"{\"id\":2,\"name\":\"李四\",\"age\":19}",
"{\"id\":3,\"name\":\"王五\",\"age\":20}"
)
//将 jsonList 数据转换成 DataSet
import session.implicits._
val ds: Dataset[String] = jsonList.toDS()
val df: DataFrame = session.read.json(ds)
df.show()
//将结果写往 ClickHouse
val url = "jdbc:clickhouse://node1:8123/default"
val table = "test"
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "")
properties.put("socket_timeout", "300000")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url,
table, properties)

3.Flink 写入 ClickHouse API

  • 可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink 在1.11.0 版本对其 JDBC Connnector 进行了重构:

在这里插入图片描述

3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API

  1. maven 中需要导入以下包:
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码:
/**
* 通过 flink-jdbc API 将 Flink 数据结果写入到 ClickHouse 中,只支持 Table API
*
* 注意:
* 1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数
据。
*/
case class PersonInfo(id:Int,name:String,age:Int)
object FlinkWriteToClickHouse1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1,后期每个并行度满批次需要的条数时,会插入 click 中
env.setParallelism(1)
val settings: EnvironmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取 Socket 中的数据
val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
val ds: DataStream[PersonInfo] = sourceDS.map(line => {
val arr: Array[String] = line.split(",")
PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
})
//将 ds 转换成 table 对象
import org.apache.flink.table.api.scala._
val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)
//将 table 对象写入 ClickHouse 中
//需要在 ClickHouse 中创建表:create table flink_result(id Int,name String,age Int) engine =
MergeTree() order by id;
val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//准备 ClickHouse table sink
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node1:8123/default")
.setUsername("default")
.setPassword("")
.setQuery(insertIntoCkSql)
.setBatchSize(2) //设置批次量,默认 5000 条
.setParameterTypes(Types.INT, Types.STRING, Types.INT)
.build()
//注册 ClickHouse table Sink,设置 sink 数据的字段及 Schema 信息
tableEnv.registerTableSink("ck-sink",
sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))
//将数据插入到 ClickHouse Sink 中
tableEnv.insertInto(table,"ck-sink")
//触发以上执行
env.execute("Flink Table API to ClickHouse Example")
}
}

3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API

  1. 在 Maven 中导入以下依赖包
<!-- Flink1.11 后需要 Flink-client 包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码
/**
* Flink 通过 flink-connector-jdbc 将数据写入 ClickHouse ,目前只支持 DataStream API
*/
object FlinkWriteToClickHouse2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val ds: DataStream[String] = env.socketTextStream("node5",9999)
val result: DataStream[(Int, String, Int)] = ds.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
//准备向 ClickHouse 中插入数据的 sql
val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//设置 ClickHouse Sink
val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
//插入数据 SQL
insetIntoCkSql,
//设置插入 ClickHouse 数据的参数
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
ps.setInt(1, tp._1)
ps.setString(2, tp._2)
ps.setInt(3, tp._3)
}
},
//设置批次插入数据
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
//设置连接 ClickHouse 的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://node1:8123/default")
.withUsername("default")
.withUsername("")
.build()
)
//针对数据加入 sink
result.addSink(ckSink)
env.execute("Flink DataStream to ClickHouse Example")
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1464143.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1分钟带你了解Python数据类型

1.Python 3 主要有6种标准数据类型 Number&#xff08;数字&#xff09; String&#xff08;字符串&#xff09; List&#xff08;列表&#xff09; Tuple&#xff08;元组&#xff09; Set&#xff08;集合&#xff09; Dictionary&#xff08;字典&#xff09; 2.Numb…

不要浪费

解法&#xff1a; 记录一下tle的代码 #include <iostream> #include <vector> #include <algorithm> using namespace std; #define endl \n bool check(vector<int>& a, int l,int k) {int sum 0;for (int i 0; i < a.size() && l…

【Android 高德地图POI定位地址搜索】

先上演示&#xff1a; 高德地图的key申请这里就不讲了&#xff0c;比较简单&#xff0c;网上有很多资料&#xff0c;或者前往官网查看&#xff1a;官方文档 依赖引入 项目使用了如下依赖&#xff1a; //高德地图implementation com.amap.api:3dmap:latest.integration//地图…

alibabacloud学习笔记06(小滴课堂)

讲Sentinel流量控制详细操作 基于并发线程进行限流配置实操 在浏览器打开快速刷新会报错 基于并发线程进行限流配置实操 讲解 微服务高可用利器Sentinel熔断降级规则 讲解服务调用常见的熔断状态和恢复 讲解服务调用熔断例子 我们写一个带异常的接口&#xff1a;

centos7部署nfs+keepalived+drbd

一、项目需求描述 现在使用的架构是nfskeepalivedrsyncsersync&#xff0c;目前这套架构存在主从nfs节点数据同步不一致问题&#xff0c;大概会有 120s左右的数据延长同步时间&#xff0c;需要提供优化的自动化方案。 二、现有方案缺点 1、切换不能保证主从节点数据一致。 2、…

C++——基础语法(1)

前言 一路磕磕绊绊&#xff0c;也算是走到了C的大门下。C从名字上就可以看出是C语言的“plusplus版本”&#xff0c;C在兼容C语言的基础上又加入了许多方便又高深的特性与机制&#xff0c;便于我们更容易处理C语言中的棘手问题。不得不提的一点是C为我们打开了面向对象思想的大…

【ACM出版】第五届计算机信息和大数据应用国际学术会议(CIBDA 2024)

第五届计算机信息和大数据应用国际学术会议&#xff08;CIBDA 2024&#xff09; 2024 5th International Conference on Computer Information and Big Data Applications 重要信息 大会官网&#xff1a;www.ic-cibda.org 大会时间&#xff1a;2024年3月22-24日 大会地点&#…

Java中哪些很容易出现的坑

文章目录 1空指针2小数的计算3包装类型4Java8 Stream5日期格式化 先来一个简单一点&#xff0c;就从空指针开始吧 1空指针 //多级调用空指针userService.getUser("张三").getUserInfo().getUserName(); //例如getUser("张三")、getUserInfo&#xff08;&a…

基于SpringBoot的景区旅游管理系统

项目介绍 本期给大家介绍一个 景区旅游管理 系统.。主要模块有首页&#xff0c;旅游路线&#xff0c;旅行攻略&#xff0c;在线预定。管理员可以登录管理后台对用户进行管理&#xff0c;可以添加酒店&#xff0c;景区&#xff0c;攻略&#xff0c;路线等信息。整体完成度比较高…

【Java面试系列】JDK 1.8 新特性之 Stream API

目录 一、Stream 简介二、Stream 特点&#xff1a;Stream 注意点&#xff1a;1、什么是聚合操作2、Stream 流1、什么是流2、流的构成3、stream 流的两种操作4、惰性求值和及早求值方法5、Stream 流的并行 三、Stream操作的三个步骤1、创建流第一种&#xff1a;通过集合第二种&a…

2024全球网络安全展望|构建协同生态,护航数字经济

2024年1月&#xff0c;世界经济论坛发布《2024全球网络安全展望》报告&#xff0c;指出在科技快速发展的背景下&#xff0c;网络安全不均衡问题加剧&#xff0c;需加强公共部门、企业组织和个人的合作。 报告强调&#xff0c;面对地缘政治动荡、技术不确定性和全球经济波动&am…

Vue知识学习

Vue 是什么&#xff1f; 概念&#xff1a;Vue 是一个用于构建用户界面的渐进式框架 Vue 的两种使用方式: ① Vue 核心包开发 场景:局部 模块改造 ② Vue 核心包& Vue插件工程化开发 场景:整站开发 创建Vue 实例&#xff0c;初始化渲染的核心步骤: 1.准备容器 2.引包(官…

哪些软件可以把试卷照片转换成电子版?试试这些软件

哪些软件可以把试卷照片转换成电子版&#xff1f;在数字化时代&#xff0c;纸质试卷的保存和传输都显得不太方便。为了解决这个问题&#xff0c;我们可以将试卷照片转换成电子版。下面&#xff0c;我将为大家介绍5款可以轻松实现这一功能的软件&#xff0c;让你轻松应对各种试卷…

吸虫塔的工作原理是什么?

吸虫塔虫情智能测报分析系统是一款专门用于长期动态监测蚜虫等小型迁飞性害虫的大型植保设备&#xff0c;由装置上方的空气动力装置、上下两层远红外虫体处理装置、高清图像采集装置、虫体收集装置等部分组成。昆虫在经由设备上方的吸风装置后会被吸入设备内部&#xff0c;上下…

【超实用!游戏主程必须掌握的必杀技!】

超实用&#xff01;游戏主程必须掌握的必杀技&#xff01; 大家有没有发现&#xff1f;以上问题都存在共性&#xff1a;那就是跨部门的沟通与协作&#xff0c;这是一个必须高度重视的问题。正是因为这些问题的存在&#xff0c;造成初入职场的焦虑和不适应。 那么产生这些问题的…

【大厂AI课学习笔记】【2.2机器学习开发任务实例】(1)搭建一个机器学习模型

今天学习的是&#xff0c;如何搭建一个机器学习模型。 主要有以上的步骤&#xff1a; 原始数据采集特征工程 数据预处理特征提取特征转换&#xff08;构造&#xff09;预测识别&#xff08;模型训练和测试&#xff09; 在实际工作中&#xff0c;特征比模型更重要。 数据和特征…

01_02_mysql06_(视图-存储过程-函数(变量、流程控制与游标)-触发器)

视图 使用 视图一方面可以帮我们使用表的一部分而不是所有的表&#xff0c;另一方面也可以针对不同的用户制定不同的查询视图。比如&#xff0c;针对一个公司的销售人员&#xff0c;我们只想给他看部分数据&#xff0c;而某些特殊的数据&#xff0c;比如采购的价格&#xff0…

flink 任务提交流程源码解析

flinkjob 提交流程 任务启动流程图1客户端的工作内容1.1解析命令1.2 执行用户代码 2集群工作内容2.2启动JobManager和 ResourceManager2.3 申请资源 启动 taskmanager 3分配任务3.1 资源计算3.2 分发任务 4 Task 任务调度执行图5 任务提交过程总结 任务启动流程图 可以先简单看…

【Vue3】toRefs和toRef在reactive中的一些应用

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

如何在debian上实现一键恢复操作系统?

在Debian或任何其他Linux发行版上实现一键恢复操作系统&#xff0c;需要创建一个系统镜像或快照&#xff0c;并设置一个简单的方法来从该镜像恢复。以下是创建和恢复系统的基本步骤&#xff1a; 1. 创建系统镜像&#xff1a; 使用像dd&#xff0c;rsync或专门的备份工具&#…