15_基于Flink将pulsar数据写入到ClickHouse

news2024/11/25 4:46:16

3.8.基于Flink将数据写入到ClickHouse

编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作

3.8.1.ClickHouse基本介绍

ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。
在这里插入图片描述
结论: ClickHouse像很多OLAP数据库一样,单表查询速度由于关联查询,而且ClickHouse的两者差距更为明显。

3.8.2.ClickHouse安装步骤

本项目中,我们仅需要安装单机测试版本即可使用(node2安装), 在实际生产中, 大家可以直接将分布式集群版本

  • 1-设置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
  • 2- 直接基于yum安装即可
sudo yum install clickhouse-server clickhouse-client
  • 3-修改配置文件
vim /etc/clickhouse-server/config.xml 
修改178行: 打开这一行的注释 
<listen_host>::</listen_host>

在这里插入图片描述

  • 4-启动clickhouse的server
systemctl start clickhouse-server 
停止:
systemctl stop clickhouse-server 
重启
systemctl restart clickhouse-server
  • 5-进入客户端
    在这里插入图片描述

3.8.3.在ClickHouse中创建目标表

create database itcast_ck; 
use itcast_ck; 
create table itcast_ck.itcast_ck_ems( 
id int, 
sid varchar(128), 
ip varchar(128), 
create_time varchar(128), 
session_id varchar(128), 
yearInfo varchar(128), 
monthInfo varchar(128), 
dayInfo varchar(128), 
hourInfo varchar(128), 
seo_source varchar(128), 
area varchar(128), 
origin_channel varchar(128), 
msg_count int(128), 
from_url varchar(128), 
PRIMARY KEY (`id`) 
) ENGINE=ReplacingMergeTree();

3.8.4.编写Flink代码完成写入到CK操作

import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;

import java.sql.Types;
import java.util.Properties;

// 基于Flink完成读取Pulsar中数据将消息数据写入到clickhouse中
public class ItcastFlinkToClickHouse {

    public static void main(String[] args) throws Exception {
        //1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2. 添加Source组件, 从Pulsar中读取消息数据
        Properties props = new Properties();
        props.setProperty("topic","persistent://public/default/itcast_ems_tab");
        props.setProperty("partition.discovery.interval-millis","5000");
        FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>(
                "pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",
                JsonDeser.of(PulsarTopicPojo.class),props);
        //2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费
        pulsarSource.setStartFromLatest();

        DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);


        //2.2  转换数据操作: 将 PulsarTopicPojo 转换为ROW对象
        SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {
            @Override
            public Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {

                return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),
                        pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),
                        pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),
                        pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());
            }
        });


        //2.3: 设置sink操作写入到CK操作
        String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

        JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder()
                .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
                .setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck")
                .setQuery(insertSql)
                .setBatchSize(1)
                .setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR)
                .build();

        tableSink.emitDataStream(rowDataSteam);


        //3. 提交执行
        env.execute("itcast_to_ck");

    }
}

3.9.HBase对接Phoenix实现即席查询

3.9.1.Phoenix安装操作

Phoenix是属于apache旗下的一款基于hbase的工具, 此工具提供一种全新的方式来操作hbase中数据(SQL),
同时Phoenix对hbase进行大量的优化工作, 能够让我们更加有效的操作hbase

整个安装操作, 大家可以参考资料中安装手册, 进行安装即可

3.9.2.在Phoenix中创建表

create view "itcast_h_ems" ( 
"id" integer primary key, 
"f1"."sid" varchar, 
"f1"."ip" varchar, 
"f1"."create_time" varchar, 
"f1"."session_id" varchar, 
"f1"."yearInfo" varchar, 
"f1"."monthInfo" varchar, 
"f1"."dayInfo" varchar, 
"f1"."hourInfo" varchar, 
"f1"."seo_source" varchar, 
"f1"."area" varchar, 
"f1"."origin_channel" varchar, 
"f1"."msg_count" integer, 
"f1"."from_url" varchar 
);

3.9.3.在Phoenix中类型说明

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/849745.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue手把手教学封装分页器

1.vue中前台 <template><div><h6>"start":{{ pageStartEnd.start }},"当前页"&#xff1a;{{ pagenow }}"end":{{ pageStartEnd.end }}</h6><!-- 如果点击上一页按钮&#xff0c;当前页减去1&#xff0c;并且如果当…

两个多选框(select)之间值的左右上下移动

<!DOCTYPE html> <html> <head><meta charset"utf-8"><title>两个多选框(select)之间值的左右上下移动</title> </head> <script src"https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>&…

vue基础知识二:你对SPA单页面的理解,它的优缺点分别是什么?如何实现SPA应用呢

一、什么是SPA SPA&#xff08;single-page application&#xff09;&#xff0c;翻译过来就是单页应用SPA是一种网络应用程序或网站的模型&#xff0c;它通过动态重写当前页面来与用户交互&#xff0c;这种方法避免了页面之间切换打断用户体验在单页应用中&#xff0c;所有必…

【AutoLayout案例1-按钮居中显示 Objective-C语言】

一、按钮居中显示 1.接下来,我们就用这个autoLayout,自动布局,给大家写一个,实现几个案例,给大家看一下 那么,首先,第一个,大家注意, 当我们使用autoLayout,自动布局的时候,我们新建一个项目, 这个新建的项目,里面有一个控制器,这个控制器,是不是默认,是四四…

Windows环境下通过脚本方式压缩并备份文件夹到其他数据盘

环境配置 压缩时需要使用7-zip进行调用&#xff0c;因此根据自己电脑进行安装 官网&#xff1a;https://www.7-zip.org/ 脚本文件 新建记事本文件&#xff0c;重命名为git_back_up.bat echo off rem 设置utf-8可以正常显示中文 chcp 65001 > nulrem 获取当前日期和时间&…

前端处理后端返回的数据中有\n\n字样的换行符标识

后端返回的数据&#xff1a; 上面圈着的部分就是\n&#xff0c;前端需要将数据进行换行&#xff0c;对于这类型的数据&#xff0c;在前端页面是需要进行稍微处理才能正常显示。如果没有经过处理&#xff0c;那么内容是不会在有换行符的位置进行换行显示的 解决办法1&#xff1…

解决GitHub的速度很慢的几种方式

1. GitHub 镜像访问 这里提供两个最常用的镜像地址&#xff1a; https://hub.njuu.cf/search https://www.gitclone.com/gogs/search/clonesearch 也就是说上面的镜像就是一个克隆版的 GitHub&#xff0c;你可以访问上面的镜像网站&#xff0c;网站的内容跟 GitHub 是完整同步…

VS2017 Visual Assist下载链接、安装流程及点击VA_X_Setup.exe没反应问题、未能正确加载问题解决

标题全为超链接 一、VS 2017 Visual Assist下载链接、安装流程 二、点击VA_X_Setup.exe没反应问题解决 三、VisualAssistX 安装失败 安装异常解决 四、安装成功时会在VS2017的项目栏扩展里看到VAssistX。点击VAssistX的属性项修改&#xff0c;取消勾选拼写报错&#xff0c…

【软件测试】银行项目,银行测试业务测试,有哪些侧重点?

前言 银行的软件测试是针对银行的软件系统&#xff08;如柜面系统、信贷系统&#xff09;和银行专用设备&#xff08;如ATM机、自助柜员机等&#xff09;进行的一系列测试工作。 银行测试人员的组成 目前银行测试人员分为行方人员和非行方人员&#xff08;外包&#xff09;。…

性能测试的结果如何解读和分析?

性能测试的结果如何解读和分析&#xff1f; 性能测试的结果需要进行细致的解读和分析&#xff0c;以便找出系统的瓶颈和问题&#xff0c;并提出改进建议。以下是一些常见的性能测试结果指标和解读方法&#xff1a; 1. 响应时间&#xff1a;响应时间是指系统处理请求所需的时间…

由于找不到vcruntime140_1.dll,无法继续执行代码(解决方法)

当我们在运行某个应用程序或游戏时&#xff0c;突然遇到了“找不到vcruntime140_1.dll”这个错误提示时&#xff0c;可能会感到有些困惑和沮丧。这个错误通常意味着我们的系统缺少了一个重要的运行库文件&#xff0c;即vcruntime140_1.dll&#xff0c;导致应用程序无法正常运行…

日志框架及其使用方法

log4j和logBack,同一个人写的&#xff0c;logBack为log4j的升级版&#xff0c;SpringBoot中默认集成logBack 作用&#xff1a;记录软件发布后的一些bug,以及数据是怎样被操作的 传统开发弊端&#xff1a; 1.日志直接输出在控制台&#xff0c;关闭控制台后&#xff0c;日志消…

【MySQL安装】卸载与安装MySQL 5.7.X版本

最近由于各种原因&#xff0c;需要重新安装MySQL。之前我的版本是8.0版本&#xff0c;现在装的5.7版本。记录一下自己的安装过程。 目录 1、卸载MySQL8.0 2、安装MySQL5.7 1、卸载MySQL8.0 如何彻底卸载MySQL_mysql 完全卸载_m0小麦麦的博客-CSDN博客相信不少小伙伴们在安装…

图的适配器

什么是图 图是一个由点的集合和边的集合所构成的数据结构。 图分为有向图和无向图。其中无向图也可以理解为有向图&#xff0c;所以可以认为所有的图都是有向图。 比方说&#xff0c;有这么一张图。其中a指向bc&#xff0c;b指向c&#xff0c;c指向p。边是带方向的&#xff0c;…

DOM基础获取元素+事件基础+操作元素

一.DOM简介 DOM&#xff0c;全称“Document Object Model&#xff08;文档对象模型&#xff09;”&#xff0c;它是由W3C定义的一个标准。 在实际开发中&#xff0c;我们有时候需要实现鼠标移到某个元素上面时就改变颜色&#xff0c;或者动态添加元素或者删除元素等。其实这些效…

揭秘bi数据分析系统:如何轻松掌握商业智能的秘密

在大数据时代的背景下&#xff0c;企业开始越来越重视数据分析的重要性。bi数据分析系统不仅可以帮助企业感知市场变化趋势&#xff0c;还可以实时监测并评估企业经营决策的效果&#xff0c;支持企业的持续发展。在国内&#xff0c;国产数据处理工具如瓴羊Quick BI等崛起&#…

揭秘:5个美国程序员与日本程序员的差异

大家好&#xff0c;这里是程序员晚枫。想了解更多精彩内容&#xff0c;快来关注程序员晚枫 今天以美国和日本程序员为例&#xff0c;给大家分享一下国外程序员的生活。 以下是五个美国程序员和日本程序员的的区别&#xff1a; 工作方式&#xff1a;美国程序员通常更注重自由和…

Scrum敏捷模型的三个角色!如何在线绘制Scrum敏捷模型图?

1. 什么是Scrum敏捷模型&#xff1f; Scrum是一种敏捷开发方法&#xff0c;用于管理和组织软件开发项目。它强调团队的自组织和迭代式开发&#xff0c;通过不断的反馈和调整来快速交付高质量的软件产品。 Scrum敏捷模型将项目分解为一系列短期的迭代周期&#xff0c;每一个…

查看module依赖树

可以通过两种方式 1、tasks------android------androidDependencies 打印结果如下&#xff1a; > Task :app:androidDependencies debug debugCompileClasspath - Dependencies for compilation --- org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.6.3jar --- org.…

WordPress 子主题(child theme)介绍

经常开发WordPress主题的朋友往往会遇到一个困惑&#xff0c;虽然主题提供了默认设置&#xff0c;也自带了不少自定义功能&#xff0c;可以满足大部分的场景使用&#xff0c;但毕竟众口难调&#xff0c;一些个性化的需求难免无法满足&#xff0c;这时就必须得修改主题文件来实现…