执行flink sql连接clickhouse库

news2025/1/20 13:06:46

手把手教学,flink connector打通clickhouse大数据库,通过下发flink sql,来使用ck。

组件版本
jdk1.8
flink1.17.2
clickhouse23.12.2.59

1.背景

flink官方不支持clickhouse连接器,工作中难免会用到。

2.方案

利用GitHub大佬提供的源代码,我用的是release-1.16:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16

3.编译

导入IDEA,maven编译即可,生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar

4.将此依赖包,导入flink工程

spring boot工程

4.1)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
<!--	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.13</version>
		<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;
	</parent>-->

	<parent>
		<groupId>com.mit.microgrid</groupId>
		<artifactId>mit-microgrid</artifactId>
		<version>${project.build.version}</version>
	</parent>

	<artifactId>mit-microgrid-flink</artifactId>
	<name>mit-microgrid-flink</name>
	<description>flink connector clickhouse</description>

	<properties>
		<java.version>1.8</java.version>
		<flink.version>1.17.2</flink.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
			<!-- 排除SpringBoot自带的日志依赖 -->
			<exclusions>
				<exclusion>
					<groupId>org.springframework.boot</groupId>
					<artifactId>spring-boot-starter-logging</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
		</dependency>

		<!--flink-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!--flink connector-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-base</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!--flink connector clickhouse-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-clickhouse</artifactId>
			<version>1.16.0-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
<!--			<artifactId>flink-clients_2.12</artifactId>-->
			<version>${flink.version}</version>
<!--			<scope>provided</scope>-->
		</dependency>
		<!-- flink sql -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-loader</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- Flink JDBC Connector -->
<!--		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.12</artifactId>
			<version>1.14.6</version> &lt;!&ndash; 与您的Flink版本匹配 &ndash;&gt;
		</dependency>-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>3.1.2-1.17</version>
		</dependency>
		<!-- ClickHouse JDBC Driver -->
		<dependency>
			<groupId>ru.yandex.clickhouse</groupId>
			<artifactId>clickhouse-jdbc</artifactId>
			<version>0.3.2</version> <!-- 请根据实际情况选择最新稳定版本 -->
		</dependency>
		<!-- 添加clickhouse-maven依赖-->
		<dependency>
			<groupId>ru.ivi.opensource</groupId>
			<artifactId>flink-clickhouse-sink</artifactId>
			<version>1.2.0</version>
		</dependency>

		<!--module-->
		<dependency>
			<groupId>com.mit.microgrid</groupId>
			<artifactId>mit-microgrid-common-core</artifactId>
			<version>${project.build.version}</version>
		</dependency>
		<dependency>
			<groupId>com.mit.microgrid</groupId>
			<artifactId>mit-microgrid-api-history</artifactId>
			<version>${project.build.version}</version>
		</dependency>

		<!--sql parse-->
		<dependency>
			<groupId>org.apache.calcite</groupId>
			<artifactId>calcite-core</artifactId>
			<version>1.37.0</version>
		</dependency>
<!--		<dependency>
			<groupId>org.apache.calcite</groupId>
			<artifactId>calcite-server</artifactId>
			<version>1.37.0</version>
		</dependency>-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-parser</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!--mysql-->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.30</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-uber</artifactId>
			<version>1.17.2</version>
		</dependency>

		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-text</artifactId>
			<version>1.12.0</version>
		</dependency>
	</dependencies>

	<build>
		<!--<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
			</plugin>-->
<!--			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.2.4</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers combine.children="append">
								<transformer
										implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>-->
<!--		</plugins>-->

		<finalName>${project.artifactId}</finalName>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<version>2.7.3</version>
				<configuration>
					<mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
					<fork>true</fork>
					<layout>ZIP</layout>
					<includeSystemScope>true</includeSystemScope>
				</configuration>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
						<configuration>
							<classifier>-with-dependencies</classifier>
						</configuration>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>3.3.0</version>
				<configuration>
					<archive>
						<addMavenDescriptor>false</addMavenDescriptor>
						<manifest>
							<mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass>
							<addClasspath>true</addClasspath>
							<classpathPrefix>lib/</classpathPrefix>
						</manifest>
					</archive>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>3.3.0</version>
				<configuration>
					<descriptors>
						<descriptor>src/main/resources/assembly/assembly.xml</descriptor>
					</descriptors>
					<outputDirectory>./../out</outputDirectory>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>

</project>

4.2)核心方法:

    /**
     * multiple sql execute
     *
     * @param sqlList
     */
    public static JobClient flinkSqlJobClientMultiple(List<String> sqlList) {
        log.info("参数sqlList: {}", sqlList);
//        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
        EnvironmentSettings setting = EnvironmentSettings.newInstance()
                .inBatchMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(setting);
        if (CollectionUtil.isNullOrEmpty(sqlList)) {
            log.warn("sqlList参数为空");
            return null;
        }
        for (String s : sqlList) {
            TableResult tableResult = tEnv.executeSql(s);
            Optional<JobClient> jobClientOptional = tableResult.getJobClient();
            if (jobClientOptional.isPresent()) {
                JobClient jobClient = jobClientOptional.get();
                log.info("jobClient: " + jobClient);
                return jobClient;
            }
        }
        log.error("没有可执行的job");
        return null;
    }

5.源码地址

https://github.com/genghongsheng0/mit-microgrid-flink

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243171.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

笔记|M芯片MAC (arm64) docker上使用 export / import / commit 构建amd64镜像

很简单的起因&#xff0c;我的东西最终需要跑在amd64上&#xff0c;但是因为mac的架构师arm64&#xff0c;所以直接构建好的代码是没办法跨平台运行的。直接在arm64上pull下来的docker镜像也都是arm64架构。 检查镜像架构&#xff1a; docker inspect 8135f475e221 | grep Arc…

免费送源码:Java+Springboot+MySQL Springboot多租户博客网站的设计 计算机毕业设计原创定制

Springboot多租户博客网站的设计 摘 要 博客网站是当今网络的热点&#xff0c;博客技术的出现使得每个人可以零成本、零维护地创建自己的网络媒体&#xff0c;Blog站点所形成的网状结构促成了不同于以往社区的Blog文化&#xff0c;Blog技术缔造了“博客”文化。本文课题研究的“…

代码随想录第46期 单调栈

这道题主要是单调栈的简单应用 class Solution { public:vector<int> dailyTemperatures(vector<int>& T) {vector<int> result(T.size(),0);stack<int> st;st.push(0);for(int i1;i<T.size();i){if(T[i]<T[st.top()]){st.push(i);}else{wh…

【Three.js基础学习】24. shader patterns

前言 课程回顾: ShaderMaterial 这里用的是着色器材质 所以顶点和片段着色器就不需要像原始着色器那样添加需要的属性 然后写 片段着色器需要属性 &#xff1a; 顶点 属性 -》变化 -》 片段中 顶点中的属性不需要声明 只需要声明传送的变量 例如 varying vec vUv; vUv uv; 补充…

力扣整理版七:二叉树(待更新)

满二叉树&#xff1a;如果一棵二叉树只有度为0的结点和度为2的结点&#xff0c;并且度为0的结点在同一层上&#xff0c;则这棵二叉树为满二叉树。深度为k&#xff0c;有2^k-1个节点的二叉树。 完全二叉树&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能没填满外&am…

173. 二叉搜索树迭代器【 力扣(LeetCode) 】

文章目录 零、原题链接一、题目描述二、测试用例三、解题思路四、参考代码 零、原题链接 173. 二叉搜索树迭代器 一、题目描述 实现一个二叉搜索树迭代器类BSTIterator &#xff0c;表示一个按中序遍历二叉搜索树&#xff08;BST&#xff09;的迭代器&#xff1a; BSTIterato…

自动驾驶系列—深入解析自动驾驶车联网技术及其应用场景

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

STL序列式容器之list

相较于vector的连续性空间&#xff0c;list相对比较复杂&#xff1b;list内部使用了双向环形链表的方式对数据进行存储&#xff1b;list在增加元素时&#xff0c;采用了精准的方式分配一片空间对数据及附加指针等信息进行存储&#xff1b; list节点定义如下 template<clas…

Element-ui Select选择器自定义搜索方法

效果图 具体实现 <template><div class"home"><el-selectref"currencySelect"v-model"currency"filterable:spellcheck"false"placeholder"请选择":filter-method"handleCurrencyFilter"change&q…

Linux debian系统安装ClamTk开源图形用户界面(GUI)杀毒软件

一、ClamTk简介 ClamTk 是一个基于 ClamAV 的开源图形用户界面&#xff08;GUI&#xff09;杀毒软件。它使用 GTK2-Perl 脚本构建而成&#xff0c;支持32位与64位操作系统。ClamTk 提供了一个直观的用户界面&#xff0c;使得用户无需深入了解命令行即可完成大部分操作。它具备…

MIT6.5840 Lab 1: MapReduce(6.824)

结果 介绍 在本实验中&#xff0c;您将构建一个MapReduce系统。您将实现一个调用应用程序Map和Reduce函数并处理文件读写的工作进程&#xff0c;以及一个将任务分发给工作进程并处理失败的工作进程的协调进程。您将构建类似于MapReduce论文的东西。&#xff08;注意&#xff1a…

Kafka进阶_1.生产消息

文章目录 一、Controller选举二、生产消息2.1、创建待发送数据2.2、创建生产者对象&#xff0c;发送数据2.3、发送回调2.3.1、异步发送2.3.2、同步发送 2.4、拦截器2.5、序列化器2.6、分区器2.7、消息可靠性2.7.1、acks 02.7.2、acks 1(默认)2.7.3、acks -1或all 2.8、部分重…

SpringBoot多环境配置的实现

前言 开发过程中必然使用到的多环境案例&#xff0c;通过简单的案例分析多环境配置的实现过程。 一、案例 1.1主配置文件 spring:profiles:active: prod server:port: 80801.2多环境配置文件 开发环境 blog:domain: http://localhost:8080测试环境 blog:domain: https:/…

鸿蒙HarmonyOS 地图定位到当前位置 site查询等操作

应用服务Map使用 地图定位 地点查询及导航 周边查询 点位标记定义等 地图定位 前提地图已经能正常显示&#xff0c;若不能显示请大家参考之前的那篇如何显示地图的博文 地图相关的api 位置效果图&#xff1a; module.json5配置权限 "requestPermissions": [{&…

AntFlow 0.11.0版发布,增加springboot starter模块,一款设计上借鉴钉钉工作流的免费企业级审批流平台

AntFlow 0.11.0版发布,增加springboot starter模块,一款设计上借鉴钉钉工作流的免费企业级审批流平台 传统老牌工作流引擎比如activiti,flowable或者camunda等虽然功能强大&#xff0c;也被企业广泛采用&#xff0c;然后也存着在诸如学习曲线陡峭&#xff0c;上手难度大&#x…

Timeline动画「硬切」的问题

1&#xff09;Timeline动画「硬切」的问题 2&#xff09;移动平台纹理压缩格式选择ASTC&#xff0c;美术出图还需遵守POT吗 3&#xff09;如何去掉DOTS Unity.Entities.Graphics创建的BatchRendererGroup的UI相机回调 4&#xff09;Timeline播放动画会产生位移的问题 这是第409…

《设计模式》创建型模式总结

目录 创建型模式概述 Factory Method: 唯一的类创建型模式 Abstract Factory Builder模式 Prototype模式 Singleton模式 最近在参与一个量化交易系统的项目&#xff0c;里面涉及到用java来重构部分vnpy的开源框架&#xff0c;因为是框架的搭建&#xff0c;所以会涉及到像…

【论文阅读】主动推理:作为感知行为的理论

文章目录 主动推理&#xff1a;作为感知行为的理论摘要1.引言2. 主动推理的概念和历史根源3. 主动推理的规范视角—以及它的发展历程 未完待续 主动推理&#xff1a;作为感知行为的理论 Active inference as a theory of sentient behavior 摘要 这篇文章综述了主动推理的历…

React--》如何高效管理前端环境变量:开发与生产环境配置详解

在前端开发中&#xff0c;如何让项目在不同环境下表现得更为灵活与高效&#xff0c;是每个开发者必须面对的挑战&#xff0c;从开发阶段的调试到生产环境的优化&#xff0c;环境变量配置无疑是其中的关键。 env配置文件&#xff1a;通常用于管理项目的环境变量&#xff0c;环境…

【工具插件类教学】在 Unity 中使用 iTextSharp 实现 PDF 文件生成与导出

目录 一、准备工作 1. 安装 iTextSharp 2. 准备资源文件 二、创建 ExportPDFTool 脚本 1、初始化 PDF 文件,设置字体 2、添加标题、内容、表格和图片 三、使用工具类生成 PDF 四、源码地址 在 Unity 项目中,我们有时会需要生成带有文本、表格和图片的 PDF 文件,以便…