数据仓库环境准备完整使用
- 一、IDEA 开发环境准备
- 1、创建项目gmall-realtime
- 2、删除当前项目的src目录并创建gmall-realtime模块
- 3、创建子项目
- 4、导入依赖
- 5、创建相关的包
- 6、在 resources 目录下创建 log4j.properties 文件,写入如下内容
- 二、数据仓库运行环境(ODS)
- 1、Flink 环境搭建
- 2、Hbase 环境搭建
- 1)Hbase 集群部署
- 2)IDEA Phoenix 环境准备
- (1)引入 Phoenix Thick Client 依赖
- (2)在 resources 目录下创建 hbase-site.xml 文件,并在文件中添加如下内容
- 3) ClickHouse 环境搭建
- 4)模拟数据准备(ODS)
- (1) 用户行为日志
- (2) 业务数据
- (1)维度数据首日全量同步
- (2)业务数据生成
- 三、数仓开发之DIM层(DIM)
- 1、总结概括下面优化方案总结
- 2、配置表
- 1) Flink CDC
- 2) 配置表设计
- (1) 字段解析
- (2) 在Mysql中创建数据库建表并开启Binlog
- 3、主要任务
- 1) 接收Kafka数据,过滤空值数据
- 2) 动态拆分维度表功能
- 3) 把流中的数据保存到对应的维度表
- 4、代码实现实现
- 1) 接收Kafka数据,过滤空值数据
- (1) 创建 KafkaUtil 工具类
- (2) 主程序
- 2) 根据MySQL的配置表,动态进行分流
- (1) 导入依赖
- (2) 创建配置表实体类
- (3) 编写操作读取配置表形成广播流
- (4) 定义一个项目中常用的配置常量类GmallConfig
- 3) 自定义函数MyBroadcastFunction
- (1) 定义类MyBroadcastFunction
- (2) 自定义函数MyBroadcastFunction-open
- (3) 自定义函数MyBroadcastFunction-processBroadcastElement
- (4) 自定义函数MyBroadcastFunction-checkTable
- (5) 自定义函数MyBroadcastFunction-processElement()
- (6) 自定义函数MyBroadcastFunction-filterColumns(),校验字段,过滤掉多余的字段
- (7) 主程序DimSinkApp中调用MyBroadcastFunction提取维度数据
- 4) 保存维度到HBase(Phoenix)
- (1) 程序流程分析
- (2) 创建 PhoenixUtil 工具类,在其中创建insertValues()方法
- (3) MyPhoenixSink
- (4) 主程序 DimSinkApp 中调用 MyPhoenixSink
- (5) 测试
一、IDEA 开发环境准备
1、创建项目gmall-realtime
2、删除当前项目的src目录并创建gmall-realtime模块
3、创建子项目
4、导入依赖
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--如果保存检查点到hdfs上,需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<!-- connector和ormat依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
5、创建相关的包
6、在 resources 目录下创建 log4j.properties 文件,写入如下内容
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n
log4j.rootLogger=error,stdout
二、数据仓库运行环境(ODS)
1、Flink 环境搭建
Flink 集群部署请参考 之前的博客
2、Hbase 环境搭建
1)Hbase 集群部署
Flink 集群部署请参考 之前的博客
2)IDEA Phoenix 环境准备
Phoenix 安装参考本节1)中文档。IDEA Phoenix 环境准备操作如下:
(1)引入 Phoenix Thick Client 依赖
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
(2)在 resources 目录下创建 hbase-site.xml 文件,并在文件中添加如下内容
hbase-size.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop102:8020/HBase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.wal.provider</name>
<value>filesystem</value>
</property>
<!-- 注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加>上以上两个配置,并使用xsync进行同步(本节1中文档已有说明)。-->
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
</configuration>
3) ClickHouse 环境搭建
ClickHouse 安装请参考之前的
4)模拟数据准备(ODS)
通常企业在开始搭建数仓时,业务系统中会存在历史数据,一般是业务数据库存在历史数据,而用户行为日志无历史数据。假定数仓上线的日期为2022-02-21,为模拟真实场景,需准备以下数据。
(1) 用户行为日志
用户行为日志,一般是没有历史数据的,故日志只需要准备2022-02-21一天的数据。具体操作如下:
(1)启动 Kafka。
(2)启动一个命令行 Kafka 消费者,消费 topic_log 主题的数据。
(3)修改两个日志服务器(hadoop102、hadoop103)中的
/opt/module/applog/application.yml配置文件,将mock.date参数改为2022-02-21。
(4)执行日志生成脚本lg.sh。
(5)观察命令行 Kafka 消费者是否消费到数据。
(2) 业务数据
实时计算不考虑历史的事实数据,但要考虑历史维度数据。因此要对维度相关的业务表做一次全量同步。
(1)维度数据首日全量同步
①修改Maxwell配置文件中的mock_date参数
[atguigu@hadoop102 maxwell]$ vim /opt/module/maxwell/config.properties
mock_date=2022-02-21
②启动业务数据采集通道,包括Maxwell、Kafka
③编写业务数据首日全量脚本
与维度相关的业务表如下
activity_info
activity_rule
activity_sku
base_category1
base_category2
base_category3
base_province
base_region
base_trademark
coupon_info
coupon_range
financial_sku_cost
sku_info
spu_info
user_info
切换到 /home/atguigu/bin 目录,创建 mysql_to_kafka.sh 文件
[atguigu@hadoop102 maxwell]$ cd ~/bin
[atguigu@hadoop102 bin]$ vim mysql_to_kafka_init.sh
在脚本中添加如下内容
#!/bin/bash
# 该脚本的作用是初始化所有的业务数据,只需执行一次
MAXWELL_HOME=/opt/module/maxwell
import_data() {
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}
case $1 in
"activity_info")
import_data activity_info
;;
"activity_rule")
import_data activity_rule
;;
"activity_sku")
import_data activity_sku
;;
"base_category1")
import_data base_category1
;;
"base_category2")
import_data base_category2
;;
"base_category3")
import_data base_category3
;;
"base_province")
import_data base_province
;;
"base_region")
import_data base_region
;;
"base_trademark")
import_data base_trademark
;;
"coupon_info")
import_data coupon_info
;;
"coupon_range")
import_data coupon_range
;;
"financial_sku_cost")
import_data financial_sku_cost
;;
"sku_info")
import_data sku_info
;;
"spu_info")
import_data spu_info
;;
"user_info")
import_data user_info
;;
"all")
import_data activity_info
import_data activity_rule
import_data activity_sku
import_data base_category1
import_data base_category2
import_data base_category3
import_data base_province
import_data base_region
import_data base_trademark
import_data coupon_info
import_data coupon_range
import_data financial_sku_cost
import_data sku_info
import_data spu_info
import_data user_info
;;
esac
增加执行权限
[atguigu@hadoop102 bin]$ chmod +x mysql_to_kafka_init.sh
执行脚本
[atguigu@hadoop102 bin]$ mysql_to_kafka_inc_init.sh all
④启动 Kafka 消费者,观察数据是否写入 Kafka
[atguigu@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db
(2)业务数据生成
业务数据生成之前要将 application. properties 文件中的 mock.date 参数修改为业务时间,首日应设置为 2022-02-21。
mock.clear 和 mock.clear.user 均为 0,表示不重置业务数据和用户数据。如下
同时要保证 Maxwell 配置文件 config.properties 中的 mock.date 参数和 application. properties 中 mock.date 参数的值保持一致。如下。
三、数仓开发之DIM层(DIM)
1、总结概括下面优化方案总结
DIM层
数据源:kafka---topic_db(包含所有的业务表)
过滤数据:过滤出所需要的维表数据
过滤条件:在代码中给定十几张维表的表名
问题:如果增加维表,需要修改代码-重新编译-打包-上传、重启任务
优化1:不修改代码、只重启任务
配置信息中保存需要的维表信息、配置信息只在程序启动的时候加载一次
优化2:不修改代码、不只重启任务
方向:让程序在启动的以后还可以获取配置信息中增加的内容
具体实施:
1) 定时任务:每隔一段时间加载一次配置信息
将定时任务写在Open方法
2) 监控配置信息: 一旦配置信息增加了数据,可以立马获取到
(1) MySQLBinglog: FlinkCDC 监控直接创建流
a.将配置信息处理成广播流:缺点-> 如果配置信息过大,冗余太多
b.按照表名进行keyBy处理:缺点-> 有可能产生数据倾斜
(2) 文件:Flum->kafka->Flink 消费创建流
写出数据:将数据写出到Phoenix
JdbcSink、自定义Sink
DIM层设计要点:
(1)
DIM层的设计依据是维度建模理论
,该层存储维度模型的维度表
。
(2)DIM层的数据存储在 HBase 表中DIM 层表是用于维度关联的,要通过主键去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,因而选用 HBase 存储维度数据。
(3)DIM层表名的命名规范为dim_表名
2、配置表
本层的任务是将业务数据直接写入到不同的 HBase 表中。那么如何让程序知道流中的哪些数据是维度数据?维度数据又应该写到 HBase 的哪些表中?为了解决这个问题,我们选择在 MySQL 中构建一张配置表,通过 Flink CDC 将配置表信息读取到程序中。
1) Flink CDC
Flink CDC 的介绍及配置请参考下面
2) 配置表设计
(1) 字段解析
我们将为配置表设计五个字段
source_table:作为数据源的业务数据表名
sink_table:作为数据目的地的 Phoenix 表名
sink_columns:Phoenix 表字段
sink_pk:Phoenix 表主键
sink_extend:Phoenix 建表扩展,即建表时一些额外的配置语句
将 source_table 作为配置表的主键,可以通过它获取唯一的目标表名、字段、主键和建表扩展,从而得到完整的 Phoenix 建表语句。
(2) 在Mysql中创建数据库建表并开启Binlog
(1)创建数据库 gmall_config ,注意:和 gmall 业务库区分开
[atguigu@hadoop102 db_log]$ mysql -uroot -p000000 -e"create database gmall_config charset utf8 default collate utf8_general_ci"
(2)在 gmall_config 库中创建配置表 table_process
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
(3)在MySQL配置文件中增加 gmall_config 开启Binlog
参考以往开启的开启Binlog
3、主要任务
1) 接收Kafka数据,过滤空值数据
对Maxwell抓取的数据进行ETL,有用的部分保留,没用的过滤掉。
2) 动态拆分维度表功能
由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理
。所以需要把各个维度表拆开处理。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。
这样的配置不适合写在配置文件中,因为这样的话
,业务端随着需求变化每增加一张维度表表,就要修改配置重启计算程序。所以这里需要一种动态配置方案
,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有三个方案实现:1、
一种是用Zookeeper存储,通过Watch感知数据变化
;
2、另一种是用mysql数据库存储,周期性的同步
;
3、再一种是用mysql数据库存储,使用广播流
。
这里选择第三种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。
所以就有了如下图:
3) 把流中的数据保存到对应的维度表
维度数据保存到HBase的表中
4、代码实现实现
1) 接收Kafka数据,过滤空值数据
(1) 创建 KafkaUtil 工具类
和 Kafka 交互要用到 Flink 提供的 FlinkKafkaConsumer、FlinkKafkaProducer 类,为了提高模板代码的复用性,将其封装到 KafkaUtil 工具类中。
此处从 Kafka 读取数据,创建 getKafkaConsumer(String topic, String groupId) 方法
package org.example.utils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
/**
* kafka的链接
*/
public class MyKafkaUtil {
private static String KAFKA_SERVER = "hadoop102:9092";
public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//消费出来是反序列化(SimpleStringSchema)
return new FlinkKafkaConsumer<String>(
topic,
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String s) {
//isEndOfStream 是否是流的最后一个
//无界的流永远false
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record == null || record.value() == null) {
//为了解决空指针问题、给他设置空字符串、后面会自动过滤掉这个
return "";
} else {
return new String(record.value());
}
}
@Override
public TypeInformation<String> getProducedType() {
//TypeInformation<String> 数据类型
return BasicTypeInfo.STRING_TYPE_INFO;
}
},
properties
);
}
}
(2) 主程序
package org.example.app.dim;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.example.app.func.DimSinkFunction;
import org.example.app.func.TableProcessFunction;
import org.example.bean.TableProcess;
import org.example.utils.MyKafkaUtil;
import java.util.Properties;
/**
* 有两个流
* 一个主流: 来自KAFKA----topic_db -maxwell写进去的
* 配置流通过FlinkCDC直接读的
*
* // 数据流:web/app -> nginx -> 业务服务器 ->Mysql(binlog) -> Maxwell ->kafka(ODS) -> FlinkApp ->Phoenix
* //程 序: Mock -> Mysql(binlog) -> Maxwell ->Kafka(ZK) ->DimApp ->Phoenix(HBase/ZK/HDFS)
*/
public class DimApp {
public static void main(String[] args) throws Exception {
//1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); //生产环境中设置为Kafka 主题分区数
//1.1 开启Checkpoint
// env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE); //5分钟开启一次
// env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L); //超时10分钟
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //共存的有几个Checkpoint
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5)); //总共会尝试3次重启、每隔5秒尝试一次(固定匹配重启)
//1.2 设置状态后端(本地的内存级别的)
// env.setStateBackend(new HashMapStateBackend());
// env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8082/211126/ck");
// System.setProperty("HADOOP_USER_NAME", "atguigu");
//2、读取kafka topic_db 主题数据创建主流
String topic = "topic_db";
String groupId = "dim_app";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
//3、过滤掉非JSON数据 &以及保留新增、变化以及初始化数据并将数据转换为JSON格式
SingleOutputStreamOperator<JSONObject> filterJsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
/**
* TODO 1、转换成json
* TODO 2、过滤
* @param value
* @param out
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<JSONObject> out) throws Exception {
try {
//将数据转换为JSON
JSONObject jsonObject = JSON.parseObject(value);
//获取数据中的操作类型字段
String type = jsonObject.getString("type");
//保留新增、变换以及初始化数据
if ("insert".equals(type) || "update".equals(type) || "bootstrap-insert".equals(type)) {
out.collect(jsonObject);
}
} catch (Exception e) {
System.out.println("发现数据异常");
}
}
});
//9、启动任务
env.execute("DimApp");
}
}
2) 根据MySQL的配置表,动态进行分流
(1) 导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 如果不引入 flink-table 相关依赖,则会报错:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.connector.base.source.reader.RecordEmitter
引入以下依赖可以解决这个问题(引入某些其它的 flink-table相关依赖也可)
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.13.0</version>
</dependency>
(2) 创建配置表实体类
package com.atguigu.gmall.realtime.bean;
import lombok.Data;
@Data
public class TableProcess {
//来源表
String sourceTable;
//输出表
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
(3) 编写操作读取配置表形成广播流
Properties prop = new Properties();
prop.setProperty("useSSL", "false");
//4、使用FlinkCDC读取Mysql配置信息表创建配置流
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-config")
.tableList("gmall-config.table_process")
.startupOptions(StartupOptions.initial())//全部都要历史数据
.serverTimeZone("UTC")
.deserializer(new JsonDebeziumDeserializationSchema())
.jdbcProperties(prop)
.build();
DataStreamSource<String> mySqlSourceDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");
// mySqlSourceDS.print(">>>");
//5、将配置流处理为广播流(map状态描述器)
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
//得到广播流
BroadcastStream<String> broadcastStream = mySqlSourceDS.broadcast(mapStateDescriptor);
//6、连接主流与广播流d
BroadcastConnectedStream<JSONObject, String> connectedStream = filterJsonObjDS.connect(broadcastStream);
(4) 定义一个项目中常用的配置常量类GmallConfig
package com.atguigu.gmall.realtime.common;
public class GmallConfig {
// Phoenix库名
public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
// Phoenix驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
// Phoenix连接参数
public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
}
3) 自定义函数MyBroadcastFunction
(1) 定义类MyBroadcastFunction
自定义广播流
package com.atguigu.gmall.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class MyBroadcastFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private MapStateDescriptor<String, TableProcess> tableConfigDescriptor;
public MyBroadcastFunction(MapStateDescriptor<String, TableProcess> tableConfigDescriptor) {
this.tableConfigDescriptor = tableConfigDescriptor;
}
@Override
public void processElement(JSONObject jsonObj, ReadOnlyContext readOnlyContext, Collector<JSONObject> out) throws Exception {
}
@Override
public void processBroadcastElement(String jsonStr, Context context, Collector<JSONObject> out) throws Exception {
}
}
(2) 自定义函数MyBroadcastFunction-open
// 定义Phoenix的连接
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("parameters....: " + parameters);
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
System.out.println("connection....: " + connection);
}
(3) 自定义函数MyBroadcastFunction-processBroadcastElement
/**
* >>>> {"before":null, "after":{"source_table":"aa","sink_table":"aaa","sink_columns":"aa","sink_pk":"aa","sink_extend":"aa"},
* "source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1670134486528,"snapshot":"false",
* "db":"gmall-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},
* "op":"r","ts_ms":1670134486531,"transaction":null}
* 广播流数据、配置流
* <p>
* todo 1、获取并解析数据,方便主流操作
* todo 2、校验表是否存在,如果不存在则需要在Phoenix中建表 checkTable
* todo 3、写入状态,广播出去 (处理广播流数据)
*
* @param value
* @param ctx
* @param collector
* @throws Exception
*/
@Override
public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> collector) throws Exception {
//TODO 1、获取并解析数据
JSONObject jsonObject = JSON.parseObject(value);
TableProcess tableProcess = JSON.parseObject(jsonObject.getString("after"), TableProcess.class);
//TODO 2、校验并建表
checkTable(tableProcess.getSinkTable(),
tableProcess.getSinkColumns(),
tableProcess.getSinkPk(),
tableProcess.getSinkExtend());
//TODO 3、写入状态,广播出去--(getBroadcastState获取广播状态)
BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
broadcastState.put(tableProcess.getSourceTable(), tableProcess);
}
(4) 自定义函数MyBroadcastFunction-checkTable
在 Phoenix 建表之前要先创建命名空间 GMALL_REALTIM。
0: jdbc:phoenix:> create schema GMALL2022_REALTIME;
checkTable() 方法如下
/**
* 校验并建表
*
* @param sinkTable Phoenix表名
* @param sinkColumns Phoenix表字段
* @param sinkPk Phoenix表主键
* @param sinkExtend Phoenix扩展字段
*/
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
PreparedStatement preparedStatement = null;
try {
//拼接SQL
//处理特殊字段
if (sinkPk == null || "".equals(sinkPk)) {
sinkPk = "id";
}
if (sinkExtend == null || "".equals(sinkExtend)) {
sinkExtend = "";
}
//拼接SQL: create table if not exists db.tn(id varchar primary key,bb varchar,cc varchar ) xxx
StringBuilder createTableSql = new StringBuilder("create table if not exists ")
.append(GmallConfig.HBASE_SCHEMA)
.append(".")
.append(sinkTable)
.append("(");
String[] columns = sinkColumns.split(",");
for (int i = 0; i < columns.length; i++) {
//取出字段
String column = columns[i];
//判断是否为主键
if (sinkPk.equals(column)) {
createTableSql.append(column).append(" varchar primary key");
} else {
createTableSql.append(column).append(" varchar");
}
//判断是否为最后一个字段 ---只要不是最后一个都要拼接一个逗号
if (i < columns.length - 1) {
createTableSql.append(",");
}
}
createTableSql.append(")").append(sinkExtend);
//编译SQL
System.out.println("建表语句: " + createTableSql);
preparedStatement = connection.prepareStatement(createTableSql.toString());
//执行SQL,建表
preparedStatement.execute();
} catch (SQLException e) {
//编译异常转换为运行时异常、主要是解决、出问题、程序必须停止
throw new RuntimeException("建表失败" + sinkTable);
} finally {
//释放资源
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
(5) 自定义函数MyBroadcastFunction-processElement()
/**
* 主流
* {"database":"gmall-211126-flink","table":"base_trademark","type":"bootstrap-insert","ts":1669877319,
* "data":{"id":1,"tm_name":"三星","logo_url":"/static/default.jpg"}}
* <p>
* todo 1、获取广播流的配置数据
* todo 2、过滤字段 filterColumn
* todo 3、补充SinkTable字段输出
*
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
//1、获取广播的配置数据
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String table = value.getString("table"); //只会获取维表数据
TableProcess tableProcess = broadcastState.get(table);
System.out.println(".....tableProcess"+tableProcess);
System.out.println(".....table"+table);
if (tableProcess != null) {
//2、过滤字段
filterColumn(value.getJSONObject("data"), tableProcess.getSinkColumns());
//3、补充SinkTable并写出到流中
value.put("sinkTable", tableProcess.getSinkTable());
out.collect(value);
} else {
System.out.println("找不到对应的key: " + table);
}
}
(6) 自定义函数MyBroadcastFunction-filterColumns(),校验字段,过滤掉多余的字段
/**
* 过滤字段
*
* @param data "data":{"id":1,"tm_name":"三星","logo_url":"/static/default.jpg"}
* @param sinkColumns "id,tm_name"
*/
private void filterColumn(JSONObject data, String sinkColumns) {
//切分sinkColumns
String[] columns = sinkColumns.split(",");
List<String> columnList = Arrays.asList(columns);
// Set<Map.Entry<String, Object>> entrySet = data.entrySet();
// Iterator<Map.Entry<String, Object>> iterator = entrySet.iterator();
// while (iterator.hasNext()) {
// Map.Entry<String, Object> next = iterator.next();
// if (!columnList.contains(next.getKey())) {
// iterator.remove();
// }
//
// }
Set<Map.Entry<String, Object>> entrySet = data.entrySet();
entrySet.removeIf(next -> !columnList.contains(next.getKey()));
}
(7) 主程序DimSinkApp中调用MyBroadcastFunction提取维度数据
SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));
4) 保存维度到HBase(Phoenix)
(1) 程序流程分析
DimSink 继承了RickSinkFunction,这个function得分两条时间线:
一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行;
另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。
(2) 创建 PhoenixUtil 工具类,在其中创建insertValues()方法
package com.atguigu.gmall.realtime.util;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
public class PhoenixUtil {
/**
* Phoenix 表数据导入方法
*
* @param conn 连接对象
* @param sinkTable 写入数据的 Phoenix 目标表名
* @param data 待写入的数据
*/
public static void insertValues(Connection conn, String sinkTable, JSONObject data) {
// 获取字段名
Set<String> columns = data.keySet();
// 获取字段对应的值
Collection<Object> values = data.values();
// 拼接字段名
String columnStr = StringUtils.join(columns, ",");
// 拼接字段值
String valueStr = StringUtils.join(values, "','");
// 拼接插入语句
String sql = "upsert into " + GmallConfig.HBASE_SCHEMA
+ "." + sinkTable + "(" +
columnStr + ") values ('" + valueStr + "')";
// 为数据库操作对象赋默认值
PreparedStatement preparedSt = null;
// 执行 SQL
try {
preparedSt = conn.prepareStatement(sql);
preparedSt.execute();
// 提交事务
conn.commit();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象获取或执行异常");
} finally {
if (preparedSt != null) {
try {
preparedSt.close();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象释放异常");
}
}
}
}
}
(3) MyPhoenixSink
自定义 SinkFunction 子类 MyPhoenixSink,在其中调用 Phoenix 工具类的 insertValues(String sinkTable, JSONObject data) 方法,将维度数据写出到 Phoenix 的维度表中。为了提升效率,减少频繁创建销毁连接带来的性能损耗,创建连接池。
(1)添加德鲁伊连接池依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.16</version>
</dependency>
(2)连接池创建工具类
package com.atguigu.gmall.realtime.util;
import com.alibaba.druid.pool.DruidDataSource;
public class DruidDSUtil {
private static DruidDataSource druidDataSource;
public static DruidDataSource createDataSource() {
// 创建连接池
druidDataSource = new DruidDataSource();
// 设置驱动全类名
druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);
// 设置连接 url
druidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);
// 设置初始化连接池时池中连接的数量
druidDataSource.setInitialSize(5);
// 设置同时活跃的最大连接数
druidDataSource.setMaxActive(20);
// 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
druidDataSource.setMinIdle(1);
// 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
druidDataSource.setMaxWait(-1);
// 验证连接是否可用使用的 SQL 语句
druidDataSource.setValidationQuery("select 1");
// 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
// 注意,默认值为 true,如果没有设置 validationQuery,则报错
// testWhileIdle is true, validationQuery not set
druidDataSource.setTestWhileIdle(true);
// 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
druidDataSource.setTestOnBorrow(false);
// 归还连接时,是否测试
druidDataSource.setTestOnReturn(false);
// 设置空闲连接回收器每隔 30s 运行一次
druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
// 设置池中连接空闲 30min 被回收,默认值即为 30 min
druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);
return druidDataSource;
}
}
(3)MyPhoenixSink 函数
package org.example.app.func;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.example.utils.DruidDSUtil;
import org.example.utils.PhoenixUtil;
public class DimSinkFunction extends RichSinkFunction<JSONObject> {
private DruidDataSource druidDataSource = null;
@Override
public void open(Configuration parameters) throws Exception {
druidDataSource = DruidDSUtil.createDataSource();
}
/**
* 每来一次数据调用一次
* {"database":"gmall-211126-flink","table":"base_trademark","type":"insert","ts":1669876072,"xid":1470,"commit":true,
* "data":{"id":13,"tm_name":"atguigu","logo_url":"aaa/aaa"},"sinkTable":"dim_xxx"}
* <p>
* {"database":"gmall-211126-flink","table":"base_trademark","type":"update","ts":1669876141,"xid":1630,"commit":true,
* "data":{"id":13,"tm_name":"atguigu","logo_url":"bbb/bbb"},"old":{"logo_url":"aaa/aaa"},"sinkTable":"dim_xxx"}
* <p>
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(JSONObject value, Context context) throws Exception {
//获取链接
DruidPooledConnection connection = druidDataSource.getConnection();
//写出数据
String sinkTable = value.getString("sinkTable");
JSONObject data = value.getJSONObject("data");
PhoenixUtil.upSerValues(connection, sinkTable, data);
//归还数据 --连接池 -不是真的把链接释放掉了
connection.close();
}
}
(4) 主程序 DimSinkApp 中调用 MyPhoenixSink
// TODO 9. 将数据写入 Phoenix 表
dimDS.addSink(new MyPhoenixSink());
(5) 测试
(1)启动HDFS、ZK、Kafka、Maxwell、HBase
(2)运行 IDEA 中的 DimSinkApp
(3)执行 mysql_to_kafka_init.sh 脚本
mysql_to_kafka_init.sh all
(4)通过phoenix查看hbase的schema以及表情况