文章目录
- 一 分流Sink之建立维度表到HBase(Phoenix)
- 1 拼接建表语句
- (1)定义配置常量类
- (2)引入依赖
- (3)hbase-site.xml
- (4)在phoenix中执行
- (5)增加代码
- a TableProcessFunction
- b checkTable
- (6)测试
- 2 过滤字段
- (1)代码编写
- (2)测试
- (3)总结
- 二 分流Sink之保存维度数据到HBase(Phoenix)
- 1 程序执行流程
- 2 DimSink
- 3 BaseDBApp
- 4 测试
- 三 分流Sink之保存业务数据到Kafka主题
- 1 BaseDBApp
- 2 MyKafkaUtil
- 3 测试
- 4 总结
- 四 总结
- 五 附录:完整代码
- 0 BaseDBApp
- 1 MyKafkaUtil
- 2 GmallConfig
- 3 TableProcess
- 4 MyDeserializationSchemaFunction
- 5 TableProcessFunction
- 6 DimSink
一 分流Sink之建立维度表到HBase(Phoenix)
1 拼接建表语句
如果读取到的配置信息是维度数据,提前在hbase中通过Phoenix创建维度表。
(1)定义配置常量类
定义一个项目中常用的配置常量类GmallConfig。
package com.hzy.gmall.realtime.common;
/**
* 实时数仓中的常量类
*/
public class GmallConfig {
public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}
(2)引入依赖
<!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,可以很方便的对bean对象的属性进行操作-->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
(3)hbase-site.xml
因为要用单独的schema,所以在Idea程序中加入hbase-site.xml。
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop101:8020/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop101,hadoop102,hadoop103</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.wal.provider</name>
<value>filesystem</value>
</property>
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
</configuration>
注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加上以上后两个配置,并使用xsync进行同步。
/opt/module/hbase-2.0.5/conf 注意分发
/opt/module/phoenix-5.0.0/bin
重启Hbase服务
(4)在phoenix中执行
create schema GMALL2022_REALTIME;
# 在hbase查看是否创建成功
cd /opt/module/hbase-2.0.5/bin/
hbase shell
list_namespace
(5)增加代码
a TableProcessFunction
// 声明连接对象
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
// 注册驱动
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
// 获取连接
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
// 如果读取到的配置信息是维度数据,提前创建维度表
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){
checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);
}
// 将配置信息放到状态中
// 拼接key
String key = sourceTable + ":" + operateType;
broadcastState.put(key,tableProcess);
b checkTable
// 在处理配置数据时,提前建立维度表
// create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)
private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {
// 对主键进行空值处理
if (pk == null){
pk = "id";
}
// 对建表扩展进行空值处理
if (ext == null){
ext = "";
}
StringBuilder createSql = new StringBuilder("create table if not exists "+
GmallConfig.HBASE_SCHEMA + "." + tableName +"(");
String[] fieldArr = fields.split(",");
for (int i = 0; i < fieldArr.length; i++) {
String field = fieldArr[i];
// 判断是否为主键
if (field.equals(pk)){
createSql.append(field + " varchar primary key ");
}else {
createSql.append(field + " varchar ");
}
if(i < fieldArr.length - 1){
createSql.append(",");
}
}
createSql.append(")" + ext);
System.out.println("phoenix中的建表语句:" + createSql);
// 创建数据库操作对象
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(createSql.toString());
// 执行sql语句
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("在phoenix中建表失败");
} finally {
// 释放资源
if(ps != null){
ps.close();
}
}
}
(6)测试
启动hadoop(等待安全模式关闭再进行下一步),zookeeper,kafka,hbase,phoenix,maxwell,在phoenix中查看表数据
!tables
select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;
2 过滤字段
数据在向下游传递之前,过滤掉不需要的字段,只保留配置表中sink_columns存在的字段。
(1)代码编写
if (tableProcess != null){
// 在配置表中找到了该操作对应的配置
// 判断是事实数据还是维度数据
String sinkTable = tableProcess.getSinkTable();
jsonObj.put("sink_table",sinkTable);
// 在向下游传递数据之前,将不需要的字段过滤掉
// 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
filterColumns(dataJsonObj,tableProcess.getSinkColumns());
String sinkType = tableProcess.getSinkType();
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
// 维度数据,放到维度侧输出流中
ctx.output(dimTag,jsonObj);
}else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
// 事实数据,放到主流中
out.collect(jsonObj);
}
}else {
// 在配置表中没有该操作对应的配置
System.out.println("No This Key In TableProcess:" + key);
}
// 过滤字段
private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {
// dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}
// sinkColumns : id,tm_name
String[] columnArr = sinkColumns.split(",");
// 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法
List<String> columnList = Arrays.asList(columnArr);
// 获取json中的每一个名值对(KV)
Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
// // 获取迭代器
// Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
// // 遍历,如果不包含则删除
// for (;it.hasNext();) {
// if(!columnList.contains(it.next().getKey())){
// it.remove();
// }
// }
entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));
}
(2)测试
开启相关环境,在表中添加或者删除数据,查看输出结果。
(3)总结
动态分流总结:
- 广播流数据处理,FlinkCDC从MySQL中读取配置信息
- 获取状态
- 读取配置**(维度表的创建)**
- 将配置封装为
Map<sourceTable:operateType,TableProcess>
放到状态中。
- 主流数据处理,maxwell从业务数据库中采集到的数据
- 获取状态
- 从状态中获取当前处理数据的配置信息(字段过滤)
- 根据配置信息进行分流(事实与维度)
- 维度表的创建
- 拼接建表语句
- 通过jdbc执行建表语句
- 测试
- schema和namespace的映射
- 拼接sql,空格处理
- 提前创建schema
- 字段过滤
- 获取保留字段,放到List集合中
- 获取dataJsonObj,转换为EntrySet
- 根据保留字段判断EntrySet遍历出来的entry,是否保留
二 分流Sink之保存维度数据到HBase(Phoenix)
1 程序执行流程
DimSink 继承了RichSinkFunction,这个function得分两条时间线。
- 一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行。
- 另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。
2 DimSink
package com.hzy.gmall.realtime.app.fun;
/**
* 将维度侧输出流的数据写到hbase(Phoenix)中
*/
public class DimSink extends RichSinkFunction<JSONObject> {
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
@Override
public void invoke(JSONObject jsonObj, Context context) throws Exception {
// 上游传递过来的数据格式如下:
// {"database":"gmall2022",
// "data":{"tm_name":"a","id":13},
// "commit":true,
// "sink_table":"dim_base_trademark",
// "type":"insert",
// "table":"base_trademark","
// ts":1670131087}
// 获取维度表表名
String tableName = jsonObj.getString("sink_table");
// 获取数据
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
// 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);
String upsertSql = genUpsertSql(tableName,dataJsonObj);
System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);
PreparedStatement ps = null;
try {
// 创建数据库操作对象
ps = conn.prepareStatement(upsertSql);
// 执行sql语句
ps.executeUpdate();
// 手动提交事务,phoenix的连接实现类不是自动提交事务
conn.commit();
}catch (SQLException e){
e.printStackTrace();
throw new RuntimeException("向phoenix维度表中插入数据失败了");
} finally {
// 释放资源
if (ps != null){
ps.close();
}
}
}
// 拼接插入语句
private String genUpsertSql(String tableName, JSONObject dataJsonObj) {
// id 10
// tm_name zs
String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +
" ("+ StringUtils.join(dataJsonObj.keySet(),",") +
") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";
return upsertSql;
}
}
3 BaseDBApp
//TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中
dimDS.addSink(new DimSink());
4 测试
开启必要环境,向base_tradmark表中添加一条数据,查看phoenix是否插入成功。
三 分流Sink之保存业务数据到Kafka主题
1 BaseDBApp
//TODO 9 将主流数据写回kafka的dwd层
realDS.addSink(
MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),
jsonObj.getJSONObject("data").toJSONString().getBytes());
}
})
);
2 MyKafkaUtil
package com.hzy.gmall.realtime.utils;
// 获取kafka的生产者
// 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
// public static FlinkKafkaProducer<String> getKafkaSink(String topic){
// return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
// }
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());
}
},props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
// 获取kafka的生产者
public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
3 测试
在base_trademark表中添加一条数据,查看程序输出结果,在phoenix中查看结果。
维度数据::3> {"database":"gmall2022","xid":24993,"data":{"tm_name":"c","id":14},"commit":true,"sink_table":"dim_base_trademark","type":"insert","table":"base_trademark","ts":1670150491}
向phoenix维度表中插入数据的sql:upsert into GMALL2022_REALTIME.dim_base_trademark (tm_name,id) values('c','14')
select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;
启动kafka消费者,kfkcon.sh dwd_order_info
,在order_info中修改一条数据,查看程序输出结果,在kafka中查看结果。
事实数据::3> {"database":"gmall2022","xid":25144,"data":{"delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00},"old":{"consignee":"苗冰"},"commit":true,"sink_table":"dwd_order_info","type":"update","table":"order_info","ts":1670150618}
kafka中输出内容
{"delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00}
4 总结
动态分流测试执行流程
-
需要启动的进程:zookeeper,kafka,maxwell,hdfs,hbase,BaseDBApp
-
当业务数据发生变化,maxwell会采集变化数据到ods层
-
BaseDBApp从ods层读取到变化数据,作为业务主流
-
BaseDBApp在启动时,会通过FlinkCDC读取配置表,作为广播流
-
业务流和广播流通过connect进行连接
-
对连接之后的数据通过process进行处理
- processElement
- processBroadcastElement
具体执行流程见一 2(3)总结
-
将维度侧输出流的数据写到Hbase中 – DimSink
- 拼接upsert
- 执行sql(手动提交事务)
-
将主流数据写回到kafka的dwd层
- 重写获取FlinkKafkaProducer的方法,自定义序列化的过程
- 将主流的数据写到kafka不同的主题中,并且保存精准一次性
四 总结
DWD的实时计算核心就是数据分流,其次是状态识别。在开发过程中使用了几个灵活度较强算子,比如RichMapFunction, ProcessFunction, RichSinkFunction。 那这几个算子何时会用到,如何进行选择,汇总见下表:
Function | 可转换结构 | 可过滤数据 | 侧输出 | open****方法 | 可以使用状态 | 输出至 |
---|---|---|---|---|---|---|
MapFunction | Yes | No | No | No | No | 下游算子 |
FilterFunction | No | Yes | No | No | No | 下游算子 |
RichMapFunction | Yes | No | No | Yes | Yes | 下游算子 |
RichFilterFunction | No | Yes | No | Yes | Yes | 下游算子 |
ProcessFunction | Yes | Yes | Yes | Yes | Yes | 下游算子 |
SinkFunction | Yes | Yes | No | No | No | 外部 |
RichSinkFunction | Yes | Yes | No | Yes | Yes | 外部 |
从对比表中能明显看出,Rich系列能功能强大,ProcessFunction功能更强大,但是相对的越全面的算子使用起来也更加繁琐。
五 附录:完整代码
0 BaseDBApp
package com.hzy.gmall.realtime.app.dwd;
public class BaseDBApp {
public static void main(String[] args) throws Exception {
//TODO 1 基本环境准备
//流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(4);
// //TODO 2 检查点设置
// //开启检查点
// env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
// // 设置检查点超时时间
// env.getCheckpointConfig().setCheckpointTimeout(60000L);
// // 设置重启策略
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
// // 设置job取消后,检查点是否保留
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
// // 指定操作HDFS的用户
// System.setProperty("HADOOP_USER_NAME","hzy");
//TODO 3 从kafka中读取数据
//声明消费的主题以及消费者组
String topic = "ods_base_db_m";
String groupId = "base_db_app_group";
// 获取消费者对象
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
// 读取数据,封装成流
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//TODO 4 对数据类型进行转换 String -> JSONObject
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);
//TODO 5 简单的ETL
SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonobj) throws Exception {
boolean flag =
jsonobj.getString("table") != null &&
jsonobj.getString("table").length() > 0 &&
jsonobj.getJSONObject("data") != null &&
jsonobj.getString("data").length() > 3;
return flag;
}
}
);
// filterDS.print("<<<");
//TODO 6 使用FlinkCDC读取配置表数据
//获取dataSource
Properties props = new Properties();
props.setProperty("scan.startup.mode","initial");
SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder()
.hostname("hadoop101")
.port(3306)
.username("root")
.password("123456")
// 可配置多个库
.databaseList("gmall2022_realtime")
///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
//注意:指定的时候需要使用"db.table"的方式
.tableList("gmall2022_realtime.table_process")
.debeziumProperties(props)
.deserializer(new MyDeserializationSchemaFunction())
.build();
// 读取数据封装流
DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);
// 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去
// 想要使用广播状态,状态描述器只能是map,使用map状态存储
MapStateDescriptor<String, TableProcess> mapStateDescriptor =
new MapStateDescriptor<>("table_process", String.class, TableProcess.class);
BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);
// 调用非广播流的connect方法,将业务流与配置流进行连接
BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);
//TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中
//声明维度侧输出流的标记
OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {};
SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
new TableProcessFunction(dimTag,mapStateDescriptor)
);
// 获取维度侧输出流
DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);
realDS.print("事实数据:");
dimDS.print("维度数据:");
//TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中
dimDS.addSink(new DimSink());
//TODO 9 将主流数据写回kafka的dwd层
realDS.addSink(
MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),
jsonObj.getJSONObject("data").toJSONString().getBytes());
}
})
);
env.execute();
}
}
1 MyKafkaUtil
package com.hzy.gmall.realtime.utils;
/**
* 操作kafka工具类
*/
public class MyKafkaUtil {
private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
private static final String DEFAULT_TOPIC = "default_topic";
// 获取kafka的消费者
public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
// 定义消费者组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),props);
}
// 获取kafka的生产者
// 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
// public static FlinkKafkaProducer<String> getKafkaSink(String topic){
// return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
// }
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());
}
},props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
// 获取kafka的生产者
public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
}
2 GmallConfig
package com.hzy.gmall.realtime.common;
/**
* 实时数仓中的常量类
*/
public class GmallConfig {
public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}
3 TableProcess
package com.hzy.gmall.realtime.beans;
import lombok.Data;
@Data
public class TableProcess {
//动态分流Sink常量 改为小写和脚本一致
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
//来源表
String sourceTable;
//操作类型 insert,update,delete
String operateType;
//输出类型 hbase kafka
String sinkType;
//输出表(主题)
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
4 MyDeserializationSchemaFunction
package com.hzy.gmall.realtime.app.fun;
public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {
// 反序列化
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 导入的是org.apache.kafka.connnect.data包
Struct valueStruct = (Struct) sourceRecord.value();
// 获取数据的来源
Struct afterStruct = valueStruct.getStruct("after");
// 获取数据库和表名的来源
Struct sourceStruct = valueStruct.getStruct("source");
// 获取数据库
String database = sourceStruct.getString("db");
// 获取表名
String table = sourceStruct.getString("table");
// 获取操作类型
// String op = valueStruct.getString("op");
String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
if(type.equals("create")){
type = "insert";
}
JSONObject jsonObj = new JSONObject();
jsonObj.put("database",database);
jsonObj.put("table",table);
jsonObj.put("type",type);
// 获取影响的数据
// 删除时,afterStruct为空
JSONObject dataJsonObj = new JSONObject();
if (afterStruct != null){
// schema获取源数据的格式,fields获取里面的各个元素
for (Field field : afterStruct.schema().fields()) {
String fieldName = field.name();
Object fieldValue = afterStruct.get(field);
dataJsonObj.put(fieldName,fieldValue);
}
}
// 删除操作会使得data属性不为空,但size为0
jsonObj.put("data",dataJsonObj);
// 向下游发送数据
collector.collect(jsonObj.toJSONString()) ;
}
// 指定类型
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
5 TableProcessFunction
package com.hzy.gmall.realtime.app.fun;
/**
* 实现动态分流
* 目前流中有两条流中的数据,使用以下两个方法分别进行处理
*/
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
// 声明维度侧输出流标签
private OutputTag<JSONObject> dimTag;
// 声明广播状态描述器
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
// 声明连接对象
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
// 注册驱动
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
// 获取连接
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.dimTag = dimTag;
this.mapStateDescriptor = mapStateDescriptor;
}
// 处理业务流中的数据,maxwell从业务数据库中采集到的数据
@Override
public void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
String table = jsonObj.getString("table");
String type = jsonObj.getString("type");
// 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insert
if (type.equals("bootstrap-insert")){
type = "insert";
jsonObj.put("type",type);
}
// 拼接key
String key = table + ":" + type;
// 获取状态
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
// 从状态中获取配置信息
TableProcess tableProcess = broadcastState.get(key);
if (tableProcess != null){
// 在配置表中找到了该操作对应的配置
// 判断是事实数据还是维度数据
String sinkTable = tableProcess.getSinkTable();
jsonObj.put("sink_table",sinkTable);
// 在向下游传递数据之前,将不需要的字段过滤掉
// 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
filterColumns(dataJsonObj,tableProcess.getSinkColumns());
String sinkType = tableProcess.getSinkType();
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
// 维度数据,放到维度侧输出流中
ctx.output(dimTag,jsonObj);
}else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
// 事实数据,放到主流中
out.collect(jsonObj);
}
}else {
// 在配置表中没有该操作对应的配置
System.out.println("No This Key In TableProcess:" + key);
}
}
// 过滤字段
private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {
// dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}
// sinkColumns : id,tm_name
String[] columnArr = sinkColumns.split(",");
// 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法
List<String> columnList = Arrays.asList(columnArr);
// 获取json中的每一个名值对(KV)
Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
// // 获取迭代器
// Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
// // 遍历,如果不包含则删除
// for (;it.hasNext();) {
// if(!columnList.contains(it.next().getKey())){
// it.remove();
// }
// }
entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));
}
// 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息
@Override
public void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
// 获取状态
BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
// 将json格式字符串转换为JSON对象
JSONObject jsonObj = JSONObject.parseObject(jsonStr);
// 获取配置表中的一条配置信息
// parseObject:将json格式字符串转化为json格式对象
// 第二个参数为将json字符串转化为何种格式的对象
TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);
// 业务数据库表名
String sourceTable = tableProcess.getSourceTable();
// 操作类型
String operateType = tableProcess.getOperateType();
// 数据类型 hbase -- 维度数据 kafka -- 事实数据
String sinkType = tableProcess.getSinkType();
// 指定输出目的地
String sinkTable = tableProcess.getSinkTable();
// 主键
String sinkPk = tableProcess.getSinkPk();
// 指定保留字段(列)
String sinkColumns = tableProcess.getSinkColumns();
// 指定建表扩展语句
String sinkExtend = tableProcess.getSinkExtend();
// 如果读取到的配置信息是维度数据,提前创建维度表
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){
checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);
}
// 将配置信息放到状态中
// 拼接key
String key = sourceTable + ":" + operateType;
broadcastState.put(key,tableProcess);
}
// 在处理配置数据时,提前建立维度表
// create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)
private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {
// 对主键进行空值处理
if (pk == null){
pk = "id";
}
// 对建表扩展进行空值处理
if (ext == null){
ext = "";
}
StringBuilder createSql = new StringBuilder("create table if not exists "+
GmallConfig.HBASE_SCHEMA + "." + tableName +"(");
String[] fieldArr = fields.split(",");
for (int i = 0; i < fieldArr.length; i++) {
String field = fieldArr[i];
// 判断是否为主键
if (field.equals(pk)){
createSql.append(field + " varchar primary key ");
}else {
createSql.append(field + " varchar ");
}
if(i < fieldArr.length - 1){
createSql.append(",");
}
}
createSql.append(")" + ext);
System.out.println("phoenix中的建表语句:" + createSql);
// 创建数据库操作对象
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(createSql.toString());
// 执行sql语句
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("在phoenix中建表失败");
} finally {
// 释放资源
if(ps != null){
ps.close();
}
}
}
}
6 DimSink
package com.hzy.gmall.realtime.app.fun;
/**
* 将维度侧输出流的数据写到hbase(Phoenix)中
*/
public class DimSink extends RichSinkFunction<JSONObject> {
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
@Override
public void invoke(JSONObject jsonObj, Context context) throws Exception {
// 上游传递过来的数据格式如下:
// {"database":"gmall2022",
// "data":{"tm_name":"a","id":13},
// "commit":true,
// "sink_table":"dim_base_trademark",
// "type":"insert",
// "table":"base_trademark","
// ts":1670131087}
// 获取维度表表名
String tableName = jsonObj.getString("sink_table");
// 获取数据
JSONObject dataJsonObj = jsonObj.getJSONObject("data");
// 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);
String upsertSql = genUpsertSql(tableName,dataJsonObj);
System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);
PreparedStatement ps = null;
try {
// 创建数据库操作对象
ps = conn.prepareStatement(upsertSql);
// 执行sql语句
ps.executeUpdate();
// 手动提交事务,phoenix的连接实现类不是自动提交事务
conn.commit();
}catch (SQLException e){
e.printStackTrace();
throw new RuntimeException("向phoenix维度表中插入数据失败了");
} finally {
// 释放资源
if (ps != null){
ps.close();
}
}
}
// 拼接插入语句
private String genUpsertSql(String tableName, JSONObject dataJsonObj) {
// id 10
// tm_name zs
String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +
" ("+ StringUtils.join(dataJsonObj.keySet(),",") +
") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";
return upsertSql;
}
}