Flink写入数据到ClickHouse

news2025/1/11 2:29:01

文章目录

      • 1.ClickHouse建表
      • 1.ClickHouse依赖
      • 2.Bean实体类
      • 3.ClickHouse业务写入逻辑
      • 4.测试写入类
      • 5.发送数据

1.ClickHouse建表

ClickHouse中建表

CREATE TABLE default.test_write
(
    id   UInt16,
    name String,
    age  UInt16
) ENGINE = TinyLog();

1.ClickHouse依赖

Flink开发相关依赖

    <properties>
        <flink.version>1.12.1</flink.version>
        <scala.version>2.12.13</scala.version>
        <clickhouse-jdbc.version>0.1.54</clickhouse-jdbc.version>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <dependencies>
        <!-- 写入数据到clickhouse -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>${clickhouse-jdbc.version}</version>
        </dependency>
        <!-- flink核心API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

2.Bean实体类

User.java

package com.daniel.bean;

import lombok.Builder;
import lombok.Data;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:35
 * @Description
 **/

@Data
@Builder
public class User {
    public int id;
    public String name;
    public int age;
}

3.ClickHouse业务写入逻辑

ClickHouseSinkFunction.java

package com.daniel.util;

import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:36
 * @Description
 **/


public class ClickHouseSinkFunction extends RichSinkFunction<User> {
    Connection conn = null;
    String sql;

    public ClickHouseSinkFunction(String sql) {
        this.sql = sql;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = getConn("localhost", 8123, "default");
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (conn != null) {
            conn.close();
        }
    }

    // 定义具体的操作
    @Override
    public void invoke(User user, Context context) throws Exception {
        // 批量插入
        PreparedStatement preparedStatement = conn.prepareStatement(sql);
        preparedStatement.setLong(1, user.id);
        preparedStatement.setString(2, user.name);
        preparedStatement.setLong(3, user.age);
        preparedStatement.addBatch();

        long startTime = System.currentTimeMillis();
        int[] ints = preparedStatement.executeBatch();
        conn.commit();
        long endTime = System.currentTimeMillis();
        System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + ints.length);
    }

    public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
        conn = DriverManager.getConnection(address);
        return conn;
    }
}
  • open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。

  • invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。

  • close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。

4.测试写入类

ClickHouseWriteTest.java

package com.daniel;

import com.daniel.bean.User;
import com.daniel.util.ClickHouseSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * @Author daniel
 * @Date: 2023/7/3 15:37
 * @Description
 **/

public class ClickHouseWriteTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        // Source
        DataStream<String> ds = env.socketTextStream("localhost", 9999);

        // Transform
        SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {
            String[] split = data.split(",");
            return User.builder()
                    .id(Integer.parseInt(split[0]))
                    .name(split[1])
                    .age(Integer.parseInt(split[2]))
                    .build();
        });

        // Sink
        String sql = "INSERT INTO default.test_write (id, name, age) VALUES (?,?,?)";
        ClickHouseSinkFunction jdbcSink = new ClickHouseSinkFunction(sql);
        dataStream.addSink(jdbcSink);
        env.execute("flink-clickhouse-write");
    }
}

5.发送数据

使用nc或者任意工具向指定端口发送数据
例如

nc -L -p 9999

发送数据

1,Daniel,25
2,David,38
3,James,16
4,Robert,27

然后启动ClickHouseWriteTest.java程序

在这里插入图片描述

查询数据

select *
from default.test_write;

由于这里是并行插入,所以没有顺序可言

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/716139.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

爬虫入门指南(7):使用Selenium和BeautifulSoup爬取豆瓣电影Top250实例讲解【爬虫小白必看】

文章目录 介绍技术要点SeleniumBeautifulSoupOpenpyxl 实现步骤&#xff1a;导入所需库设置网页URL和驱动路径创建 ChromeDriver 服务配置 ChromeDriver创建 Excel 文件爬取数据关闭浏览器保存 Excel 文件 完整代码导出的excel 效果图未完待续.... 介绍 在本篇博客中&#xff…

Day976.如何安全、快速地接入OAuth 2.0? -OAuth 2.0

如何安全、快速地接入OAuth 2.0&#xff1f; Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于如何安全、快速地接入OAuth 2.0&#xff1f;的内容。 授权服务将 OAuth 2.0 的复杂性都揽在了自己身上&#xff0c;这也是授权服务为什么是 OAuth 2.0 体系的核心的原因之…

企业为什么纷纷推崇数字化管理?

企业提倡或推崇数字化管理&#xff0c;通常是因为几个原因&#xff1a; 效率和生产力&#xff1a;数字管理系统简化流程并自动执行重复任务&#xff0c;从而提高效率和生产力。数字工具可以更快地输入、检索和分析数据&#xff0c;减少人工工作和人为错误。这种效率使企业能够在…

STM32速成笔记—Flash闪存

文章目录 一、Flash简介二、STM32F1的Flash三、Flash操作步骤四、程序设计4.1 读取数据4.2 写入数据&#xff08;不检查&#xff09;4.3 写入数据&#xff08;检查&#xff09; 五、注意事项 一、Flash简介 快闪存储器&#xff08;flash memory&#xff09;&#xff0c;是一种…

物理与IP环境的重要性:打造稳定可靠的亚马逊测评环境

在亚马逊平台上进行测评补单、撸卡和撸货等活动&#xff0c;首要问题是确保环境的安全性和稳定性。一个稳定的环境是进行测评和撸卡的基础&#xff0c;如果无法解决安全性问题&#xff0c;那么从事这些项目就不值得。在环境技术研发领域已经有六七年的经验&#xff0c;在早期测…

红利期已过?2023跨境电商还吃香吗?亚马逊还能做吗?

2022年&#xff0c;由于疫情反复和外部因素的影响&#xff0c;跨境电商的情况并不乐观。但这并不意味着跨境电商已经走到了绝境。随着贸易全球化的深入发展&#xff0c;平台规则不断完善&#xff0c;国家相继出台最新的扶持政策&#xff0c;为跨境电商企业带来了更多的发展机遇…

Spring Boot 中的 Zookeeper 分布式锁

Spring Boot 中的 Zookeeper 分布式锁 分布式锁是分布式系统中常用的一个同步工具&#xff0c;它可以在多个进程之间协调访问共享资源&#xff0c;避免数据不一致或重复处理。在分布式环境中&#xff0c;由于网络通信的延迟和节点故障等原因&#xff0c;传统的锁机制无法满足需…

MES与ERP系统的生产计划管理到底有什么不同?

MES 的生产计划管理与 ERP 的生产计划管理到底有什么不同&#xff1f; 生产计划管理是企业发展的重要一环&#xff0c;对于提升企业生产效率&#xff0c;提高客户满意度&#xff0c;降低成本&#xff0c;提高客户满意度等方面都有重要意义。 我们首先来看MES和ERP生产计划管理…

Cisco Catalyst 9000 Series Switches, IOS-XE Release Dublin-17.11.1 ED

Cisco Catalyst 9000 Series Switches, IOS-XE Release Dublin-17.11.1 ED Cisco Catalyst 9000 交换产品系列 请访问原文链接&#xff1a;https://sysin.org/blog/cisco-catalyst-9000/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;…

Vox-E: Text-guided Voxel Editing of 3D Objects(3D目标的文本引导体素编辑)

Vox-E: Text-guided Voxel Editing of 3D Objects &#xff08;3D目标的文本引导体素编辑&#xff09; Paper&#xff1a;https://readpaper.com/paper/1705264952657440000 Code&#xff1a;http://vox-e.github.io/ 原文链接&#xff1a;Vox-E: 3D目标的文本引导体素编辑 &…

如何写出高效的软件测试用例,测试人都用得到

要编写出高效的测试用例&#xff0c;需要搞清楚什么是测试用例&#xff0c;以及如何编写出高效的测试用例&#xff1f;接下来将从以下几个部分来进行展开 1、什么是测试用例 2、如何编写测试用例 一、什么是测试用例 测试用例 &#xff1a;为了特定目的而设计的由一组测试输…

【el-tree大量数据卡顿解决】el-tree利用懒加载解决大数据量卡顿问题,el-tree懒加载回显方法

描述 问题是这样&#xff1a;我的项目中&#xff0c;有一个角色管理的页面。以前的老代码&#xff0c;直接用el-tree渲染的树形结构&#xff0c;勾选设置对应的权限。其他的部门倒是还好&#xff0c;但是涉及到老板的部门设置的时候&#xff0c;由于我们这边的权限太多&#x…

ESP32开发:1、环境搭建(基于vscode+ESP-IDF)

1、ESP-IDF ESP-IDF提供操作ESP32芯片的API函数&#xff0c;供用户编写的用户程序调用。当用户程序编写好后&#xff0c;ESP-IDF需要借助一系列编译工具才能将用户程序API函数编译成能运行在ESP32上的二进制文件。 如上图所示这个1个G左右大的压缩包就是ESP-IDF。如果电脑上已经…

sslTrus (RSA) OV CA

sslTrus (RSA) OV CA品牌证书是建立在Sectigo CA机构的一种面向中国大陆的PKI定制中级根证书服务&#xff0c;OCSP国内本地网络优化&#xff0c;更适合中国网络。采取的Sectigo根证书建立的信任&#xff0c;更是完整。 sslTrus (RSA) OV CA可以选择&#xff1a;单域名、通配符…

生产级Redis Cluster部署(4.0.10版本)

生产级Redis Cluster部署 环境准备 主机名 IP地址 端口 描述 redis-master 192.168.1.51 7000 redis-master01 7001 redis-master02 7002 redis-master03 redis-slave 192.168.1.52 8000 redis-slave01 8001 redis-slave02 8002 redis-slave03 初始化…

代码随想录二刷day42 | 动态规划之背包问题 416. 分割等和子集

day42 416. 分割等和子集确定dp数组以及下标的含义确定递推公式dp数组如何初始化确定遍历顺序举例推导dp数组 416. 分割等和子集 题目链接 解题思路&#xff1a; 这是一维的背包问题 只有确定了如下四点&#xff0c;才能把01背包问题套到本题上来。 背包的体积为sum / 2背包要…

Java 实现快慢指针法返回链表的中间结点

一、思路 这里分为链表结点个数是 奇数 和 偶数 两种情况。 如果是奇数&#xff0c;中间结点只有一个&#xff0c;返回即可&#xff1b;如果是偶数&#xff0c;中间结点则有两个&#xff0c;这里要求返回第二个。 上述图片展示的就是奇数的情况&#xff0c;此时中间结点就是…

找不到“$libdir/postgis-X.X“问题解决方案

背景&#xff1a; 数据库从postgresql-11.9 升级到11.20版本&#xff0c;11.20版本采用了docker镜像 postgis/postgis:11-3.3 (截止20230703实际对应的版本为pg11.20postgis3.3) 升级版本&#xff0c;使用了原来的data&#xff0c;主要版本不变&#xff0c;次要版本升级&#…

文档管理系统是业迈向数字化办公的新时代

随着信息技术的不断发展&#xff0c;企业数字化办公已成为越来越多企业的选择。在数字化办公中&#xff0c;文档管理系统是一个非常重要的组成部分&#xff0c;可以帮助企业打破时空限制&#xff0c;提高工作效率和质量&#xff0c;推动企业向数字化办公的新时代迈进。 什么是…

力扣 39. 组合总和

题目来源&#xff1a;https://leetcode.cn/problems/combination-sum/description/ C题解&#xff1a; 递归法。递归前对数组进行有序排序&#xff0c;可方便后续剪枝操作。 递归函数参数&#xff1a;定义两个全局变量&#xff0c;二维数组result存放结果集&#xff0c;数组pa…