Flink同步Kafka数据到ClickHouse分布式表

news2025/1/26 10:43:19

公众号文章都在个人博客网站:https://www.ikeguang.com/ 同步,欢迎访问。

业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定ClickHouse

什么是ClickHouse?

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。

列式数据库更适合于OLAP场景(对于大多数查询而言,处理速度至少提高了100倍),下面详细解释了原因(通过图片更有利于直观理解),图片来源于ClickHouse中文官方文档。

行式

5cec60671a4b90bdfb7259ea28d2dec6.gif

列式

38fea0fdc5715b1757962b616ca2a922.gif

我们使用Flink编写程序,消费kafka里面的主题数据,清洗、归一,写入到clickhouse里面去。

这里的关键点,由于第一次使用,无法分清应该建立什么格式的clickhouse表,出现了一些问题,最大的问题就是程序将数据写入了,查询发现数据不完整,只有一部分。我也在网上查了一些原因,总结下来。

为什么有时看不到已经创建好的表并且查询结果一直抖动时多时少?

  • 常见原因1:

建表流程存在问题。ClickHouse的分布式集群搭建并没有原生的分布式DDL语义。如果您在自建ClickHouse集群时使用create table创建表,查询虽然返回了成功,但实际这个表只在当前连接的Server上创建了。下次连接重置换一个Server,您就看不到这个表了。

解决方案:建表时,请使用create table <table_name> on cluster default语句,on cluster default声明会把这条语句广播给default集群的所有节点进行执行。示例代码如下。Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple(); 在test表上再创建一个分布式表引擎,建表语句如下。Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));

  • 常见原因2:

ReplicatedMergeTree存储表配置有问题。ReplicatedMergeTree表引擎是对应MergeTree表引擎的主备同步增强版,在单副本实例上限定只能创建MergeTree表引擎,在双副本实例上只能创建ReplicatedMergeTree表引擎。

解决方案:在双副本实例上建表时,请使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)为固定配置,无需修改。

这里引出了复制表的概念,这里介绍一下,只有 MergeTree 系列里的表可支持副本:

  • ReplicatedMergeTree

  • ReplicatedSummingMergeTree

  • ReplicatedReplacingMergeTree

  • ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree

  • ReplicatedVersionedCollapsingMergeTree

  • ReplicatedGraphiteMergeTree

副本是表级别的,不是整个服务器级的。所以,服务器里可以同时有复制表和非复制表。副本不依赖分片。每个分片有它自己的独立副本。

创建复制表

先做好准备工作,该建表的建表,然后编写程序。在表引擎名称上加上 Replicated 前缀。例如:ReplicatedMergeTree。

  1. 首先创建一个分布式数据库

create database test on cluster default_cluster;
  1. 创建本地表

由于clickhouse是分布式的,创建本地表本来应该在每个节点上创建的,但是指定on cluster关键字可以直接完成,建表语句如下:

CREATE TABLE test.test_data_shade on cluster default_cluster
(
    `data` Map(String, String),
    `uid` String,
    `remote_addr` String,
    `time` Datetime64,
    `status` Int32,
    ...其它字段省略
    `dt` String
)
ENGINE = ReplicatedMergeTree()
partition by dt
order by (dt, sipHash64(uid));

这里表引擎为ReplicatedMergeTree,即有副本的表,根据dt按天分区,提升查询效率,sipHash64是一个hash函数,根据uid散列使得相同uid数据在同一个分片上面,如果有去重需求,速度更快,因为可以计算每个分片去重,再汇总一下即可。

  1. 创建分布式表

CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed('default_cluster', 'test', 'test_data_shade', sipHash64(uid));

在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表写入或读取数据,Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

通过jdbc写入

这个我是看的官方文档,里面有2种选择,感兴趣的同学可以都去尝试一下。

68fc99a7a86ebe873db890e4d6aa50eb.png

这里贴一下我的Pom依赖

<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.1-patch</version>
    <classifier>shaded</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Flink主程序,消费kafka,做清洗,然后写入clickhouse,这都是常规操作,这里贴一下关键代码吧。

f6e990b5872aa3f0f82be21af17b7610.png

连接clickhouse有2种方式,8123端口的http方式,和基于9000端口的tcp方式。

这里官方推荐的是连接驱动是0.3.2:

<dependency>
    <!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.2-patch11</version>
    <classifier>all</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.

8fd33bb7dd32721db9c6669bb285f917.png

官方推荐升级到0.3.2,上面表格给出了升级方法,文档地址:

https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/51423.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]JAVA毕业设计高校疫情管理(系统+LW)

[附源码]JAVA毕业设计高校疫情管理&#xff08;系统LW&#xff09; 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&…

[附源码]计算机毕业设计springboot社区疫情防控信息管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Android OpenGL ES 学习(五) -- 渐变色

OpenGL 学习教程 Android OpenGL ES 学习(一) – 基本概念 Android OpenGL ES 学习(二) – 图形渲染管线和GLSL Android OpenGL ES 学习(三) – 绘制平面图形 Android OpenGL ES 学习(四) – 正交投屏 Android OpenGL ES 学习(五) – 渐变色 代码工程地址&#xff1a; https://…

【实习之velocity 三 Vtl-引入资源】

文章目录一、#include1.作用:引入外部资源&#xff0c;引入的资源不会被引擎所解析2.语法:#include(resource)二、#parse作用:引入的外部资源,引入的资源将被引擎所解析语法:#parse(resource)三、define作用:定义重用模块(不带参数)语法:四、evaluate作用:动态计算,动态计算可以…

学习笔记:内存四区

内存分区模型 1内存分区模型 C程序在执行I将内存大方向划分为4个区域 ●代码区:存放函数体的二进制代码&#xff0c;由操作系统进行管理的 ●全局区:存放全局变量和静态变量以及常量 ●栈区:由编译器自动分配释放存放函数的参数值局部变量等 ●堆区:由程序员分配和释放若程序员…

【WSN布局】基于LICHTENBERG算的多目标传感器选择和放置优化问题研究附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

如何改变胆小怕事的性格?

胆小怕事 就是指一个人做事总是十分胆怯&#xff0c;畏畏缩缩&#xff0c;举手投足也不够大气&#xff0c;经常要看别人的脸色&#xff0c;害怕麻烦上身&#xff0c;对强势的人也会显得唯唯诺诺&#xff0c;战战兢兢。胆小怕事 是某种心理问题的表现&#xff0c;或者说性格缺陷…

基于Q-learning方法的地铁列车时刻表重新调度

文章信息《Metro Train Timetable Rescheduling Based on Q-learning Approach》是发表在2020 IEEE 23rd International Conference on Intelligent Transportation Systems (ITSC)上的一篇文章。摘要在地铁系统中&#xff0c;不可预测的干扰会影响正常运行&#xff0c;给乘客带…

什么是短网址?如何调用接口生成短地址?

随着网络应用的深入和普及&#xff0c;网址资源越来越少了&#xff0c;长尾网址也派上用场了&#xff0c;只是网址太长不方便识别与记录。因此&#xff0c;就有了短网址替代长网址的技术接口。 随着SEO的重要性越来越明显&#xff0c;在推广的时候如果把网页链接缩短可以获得更…

【wireshark】如何获取一个设备的IP地址

问题 开发中往往会出现无法知道设备正确的IP地址&#xff0c;从而无法连接到设备。 解决方式&#xff1a; 使用软件工具wireshark来获取设备IP地址。 可以实现不同网段捕获设备IP 具体流程&#xff1a; 1. 下载wireshark抓包程序 https://www.wireshark.org/download.htm…

MySQL日志(undo log 和 redo log 实现事务的原子性/持久性/一致性)

日志的重要性 日志绝对是数据库的核心. 持久化的日志记录了各种重要的信息.数据的恢复需要依赖日志。 慢查询sql语句需要用到慢查询日志。以及错误日志中保存着mysqld数据库服务端在启动过程中发生的重大错误信息... 数据库重要组成 本质上来说是一个文件系统 (两大重要组…

PHP+MySQL基于thinkphp的企业信息销售展示系统的设计

公司企业网站,是一个供为企业推广的平台,是完全的,高速的,开放的,其核心思想是提供一个以自然语言为主的用户界面,让用户能够更好的刚加方便快捷的管理物流信息的一个渠道和平台。本课题的开发工具可以使用PHP开发语言和MySQL数据进行的开发。 该系统的基本功能包括用户注册登录…

JAVA 设计模式篇

JAVA 设计模式篇1、UML类图2、设计原则2.1、开闭原则2.2、里氏代换原则2.3、依赖倒转原则2.4、接口隔离原则2.5、迪米特法则2.6、合成复用原则3、设计模式3.1、单例模式3.1.1、单例模式实现——饿汉式3.1.1.1、静态变量实现3.1.1.2、静态方法实现3.1.1.3、枚举方式3.1.2、单例模…

GRPC远程调用

FAQ | gRPC1. gRPC原理 FAQ | gRPC Asynchronous-API tutorial | C | gRPC 1.1 什么是RPC RPC 即远程过程调用协议&#xff08;Remote Procedure Call Protocol&#xff09;&#xff0c;可以让我们像调用本地对象一样发起远程调用。RPC 凭借其强大的治理功能&#xff0c;成…

Linux 中的文件简单说明

Linux 中的文件简单说明 作者&#xff1a;Grey 原文地址&#xff1a; 博客园&#xff1a;Linux 中的文件简单说明 CSDN&#xff1a;Linux 中的文件简单说明 说明 本文基于 CentOS 7 根目录(/)下文件夹主要作用 [rootlinux /]# ll / total 16 lrwxrwxrwx. 1 root root…

VMOS虚拟机开源,游戏安全面临新挑战

相信大家对虚拟机并不陌生&#xff0c;一台设备可以模拟出多个操作系统&#xff0c;完美解决了不同场景下设备限制问题&#xff0c;还节约了购买软硬件设备的成本&#xff0c;为工作和生活提供了不少便利&#xff0c;得到了广泛的应用。 而虚拟机技术却被游戏黑灰产所利用&…

全球约有 150 亿台设备在运行 Java,收费后还能用吗?

据估算&#xff0c;全球约有 150 亿台设备在运行 Java™。约900万 Java 程序员.... https://www.oracle.com/java/technologies/downloads/archive 一、Java8及之前的版本均免费 我们可以看到上图中绿色的部分均是免费版本大家可以随便下载随便使用。 二、最后的免费版本 jd…

将光耦合进入单模光纤的最佳工作距离

摘要 光纤是现代光学系统中最通用的部件之一。它们最重要的特点之一是它们能够在远距离&#xff08;甚至几公里&#xff09;内以极低的损耗传输光能。另一方面&#xff0c;以一种能够达到尽可能高的效率的方式将光耦合到光纤中通常是一项非常精细的需求&#xff1a;例如&…

美苏太空竞赛历年卫星火箭发射以及历史事件介绍

1957 时间苏联美国折叠时间7月16日在与政府官员的会晤中&#xff0c;科罗廖夫和格鲁什科提出了开发超重型火箭的想法。美国海军对先锋号火箭进行试射。5月1日10月4日卫星号火箭发射了斯普特尼克1号&#xff0c;即第一颗人造卫星。美国海军对先锋号火箭进行试射。10月23日11月3…

ArcGIS API4.X + API文档 本地部署(Tomcat)

前言&#xff1a; js.arcgis.com有时候不太稳定&#xff0c;导致项目或者自己测试代码需要等待远程资源请求&#xff0c;体验感及其不好&#xff0c;能自己掌控的资源最好就别去拿在线的&#xff0c;当然服务器稳定就另当别论。&#xff08;所以本地部署有两种含义&#xff1a;…