文章目录
- 一 根据MySQL的配置表,动态进行分流
- 1 自定义反序列化器
- (1)需求分析
- (2)代码实现
- 2 从配置表中读取数据
- (1)自定义CDC采集的反序列化器
- (2)使用FlinkCDC读取配置表数据
- 3 定义广播状态
- 4 完成动态分流 -- 广播流
- (1)TableProcessFunction
- (2)主程序
- (3)测试
- 5 完成动态分流 -- 业务流
- (1)TableProcessFunction
- (2)测试
- 6 完整代码
- (1)主程序
- (2)TableProcessFunction
- (3)MyDeserializationSchemaFunction
- (4)总结
一 根据MySQL的配置表,动态进行分流
动态分流整体思路:
1 自定义反序列化器
(1)需求分析
使用FLinkCDC API方式获取到的内容如下
// 封装的数据对象类型
SourceRecord{
sourcePartition={server=mysql_binlog_source},
sourceOffset={file=mysql-bin.000004, pos=1343, row=1, snapshot=true}
}
// ConnectRecord 是 SourceRecord的父类
ConnectRecord{
topic='mysql_binlog_source.gmall2022_realtime.t_user',
kafkaPartition=null,
key=Struct{id=1},
keySchema=Schema{mysql_binlog_source.gmall2022_realtime.t_user.Key:STRUCT},
value=Struct{
after=Struct{id=1,name=zhangsan,age=18},
source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall2022_realtime,table=t_user,server_id=0,file=mysql-bin.000004,pos=1343,row=0},
op=c,ts_ms=1669896172346},
valueSchema=Schema{mysql_binlog_source.gmall2022_realtime.t_user.Envelope:STRUCT},
timestamp=null,
headers=ConnectHeaders(headers=)}
自动定义反序列化器,实现将以上信息转化为JSON格式的字符串,包换的属性包括:数据库名称,表名,操作类型,影响的数据内容。
以上内容中有价值的信息如下:
ConnectRecord{
value=Struct{
after=Struct{id=1,name=zhangsan,age=18},
source=Struct{
db=gmall2022_realtime,
table=t_user
},
op=c
},
}
(2)代码实现
/**
* 通过FlinkCDC动态读取MySQL表中的数据 -- DataStreamAPI
*
* 自定义反序列化器
*/
public class FlinkCDC03_CustomerSchema {
public static void main(String[] args) throws Exception {
//TODO 1 准备流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 3 创建Flink-MySQL-CDC的Source
Properties props = new Properties();
props.setProperty("scan.startup.mode","initial");
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("hadoop101")
.port(3306)
.username("root")
.password("123456")
// 可配置多个库
.databaseList("gmall2022_realtime")
///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
//注意:指定的时候需要使用"db.table"的方式
.tableList("gmall2022_realtime.t_user")
.debeziumProperties(props)
.deserializer(new MySchema())
.build();
//TODO 4 使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDS = env.addSource(sourceFunction);
//TODO 5 打印输出
mysqlDS.print();
//TODO 6 执行任务
env.execute();
}
}
class MySchema implements DebeziumDeserializationSchema<String> {
/**
* ConnectRecord{
* value=Struct{
* after=Struct{id=1,name=zhangsan,age=18},
* source=Struct{
* db=gmall2022_realtime,
* table=t_user
* },
* op=c
* },
* }
*/
// 反序列化
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 导入的是org.apache.kafka.connnect.data包
Struct valueStruct = (Struct) sourceRecord.value();
// 获取数据的来源
Struct afterStruct = valueStruct.getStruct("after");
// 获取数据库和表名的来源
Struct sourceStruct = valueStruct.getStruct("source");
// 获取数据库
String database = sourceStruct.getString("db");
// 获取表名
String table = sourceStruct.getString("table");
// 获取操作类型
// String op = valueStruct.getString("op");
String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
if(type.equals("create")){
type = "insert";
}
JSONObject jsonObj = new JSONObject();
jsonObj.put("database",database);
jsonObj.put("table",table);
jsonObj.put("type",type);
// 获取影响的数据
// 删除时,afterStruct为空
JSONObject dataJsonObj = new JSONObject();
if (afterStruct != null){
// schema获取源数据的格式,fields获取里面的各个元素
for (Field field : afterStruct.schema().fields()) {
String fieldName = field.name();
Object fieldValue = afterStruct.get(field);
dataJsonObj.put(fieldName,fieldValue);
}
}
// 删除操作会使得data属性不为空,但size为0
jsonObj.put("data",dataJsonObj);
// 向下游发送数据
collector.collect(jsonObj.toJSONString()) ;
}
// 指定类型
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
2 从配置表中读取数据
(1)自定义CDC采集的反序列化器
将CDC格式数据转换为json字符串
package com.hzy.gmall.realtime.app.fun;
public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {
// 反序列化
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 导入的是org.apache.kafka.connnect.data包
Struct valueStruct = (Struct) sourceRecord.value();
// 获取数据的来源
Struct afterStruct = valueStruct.getStruct("after");
// 获取数据库和表名的来源
Struct sourceStruct = valueStruct.getStruct("source");
// 获取数据库
String database = sourceStruct.getString("db");
// 获取表名
String table = sourceStruct.getString("table");
// 获取操作类型
// String op = valueStruct.getString("op");
String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
if(type.equals("create")){
type = "insert";
}
JSONObject jsonObj = new JSONObject();
jsonObj.put("database",database);
jsonObj.put("table",table);
jsonObj.put("type",type);
// 获取影响的数据
// 删除时,afterStruct为空
JSONObject dataJsonObj = new JSONObject();
if (afterStruct != null){
// schema获取源数据的格式,fields获取里面的各个元素
for (Field field : afterStruct.schema().fields()) {
String fieldName = field.name();
Object fieldValue = afterStruct.get(field);
dataJsonObj.put(fieldName,fieldValue);
}
}
// 删除操作会使得data属性不为空,但size为0
jsonObj.put("data",dataJsonObj);
// 向下游发送数据
collector.collect(jsonObj.toJSONString()) ;
}
// 指定类型
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
(2)使用FlinkCDC读取配置表数据
//TODO 6 使用FlinkCDC读取配置表数据
//获取dataSource
Properties props = new Properties();
props.setProperty("scan.startup.mode","initial");
SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder()
.hostname("hadoop101")
.port(3306)
.username("root")
.password("123456")
// 可配置多个库
.databaseList("gmall2022_realtime")
///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
//注意:指定的时候需要使用"db.table"的方式
.tableList("gmall2022_realtime.table_process")
.debeziumProperties(props)
.deserializer(new MyDeserializationSchemaFunction())
.build();
// 读取数据封装流
DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);
程序中目前存在两条流:
- 数据主流
- 配置表流
3 定义广播状态
Flink广播状态。
为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去
// 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去
// 想要使用广播状态,状态描述器只能是map
MapStateDescriptor<String, TableProcess> mapStateDescriptor =
new MapStateDescriptor<>("table_process", String.class, TableProcess.class);
BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);
// 调用非广播流的connect方法,将业务流与配置流进行连接
BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);
4 完成动态分流 – 广播流
处理广播流中的数据,广播流对状态有写的权限。
(1)TableProcessFunction
package com.hzy.gmall.realtime.app.fun;
/**
* 实现动态分流
* 目前流中有两条流中的数据,使用以下两个方法分别进行处理
*/
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
// 声明维度侧输出流标签
private OutputTag<JSONObject> dimTag;
// 声明广播状态描述器
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.dimTag = dimTag;
this.mapStateDescriptor = mapStateDescriptor;
}
// 处理业务流中的数据,maxwell从业务数据库中采集到的数据
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
}
// 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息
@Override
public void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
// 获取状态
BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
// 将json格式字符串转换为JSON对象
JSONObject jsonObj = JSONObject.parseObject(jsonStr);
// 获取配置表中的一条配置信息
// parseObject:将json格式字符串转化为json格式对象
// 第二个参数为将json字符串转化为何种格式的对象
TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);
// 业务数据库表名
String sourceTable = tableProcess.getSourceTable();
// 操作类型
String operateType = tableProcess.getOperateType();
// 数据类型 hbase -- 维度数据 kafka -- 事实数据
String sinkType = tableProcess.getSinkType();
// 指定输出目的地
String sinkTable = tableProcess.getSinkTable();
// 主键
String sinkPk = tableProcess.getSinkPk();
// 指定保留字段(列)
String sinkColumns = tableProcess.getSinkColumns();
// 指定建表扩展语句
String sinkExtend = tableProcess.getSinkExtend();
// 将配置信息放到状态中
// 拼接key
String key = sourceTable + ":" + operateType;
broadcastState.put(key,tableProcess);
}
}
(2)主程序
//TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中
//声明维度侧输出流的标记
OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {};
SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
new TableProcessFunction(dimTag,mapStateDescriptor)
);
// 获取维度侧输出流
DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);
realDS.print("事实数据:");
dimDS.print("维度数据:");
(3)测试
向table_process中添加几条数据
source_table operate_type sink_type sink_table sink_columns sink_pk sink_extend
-------------- ------------ --------- ------------------ -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ------- -------------
base_trademark insert hbase dim_base_trademark id,tm_name id (NULL)
order_info update kafka dwd_order_info id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time id (NULL)
# 启动zookeeper、kafka、maxwell
读取配置表数据,反序列化,拼接成JSON,json格式如下:
将配置信息封装成tableProcess对象,如下图:
5 完成动态分流 – 业务流
(1)TableProcessFunction
处理业务流中的数据,业务流只能从状态中获取数据,无写的权限。
// 处理业务流中的数据,maxwell从业务数据库中采集到的数据
@Override
public void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
String table = jsonObj.getString("table");
String type = jsonObj.getString("type");
// 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insert
if (type.equals("bootstrap-insert")){
type = "insert";
jsonObj.put("type",type);
}
// 拼接key
String key = table + ":" + type;
// 获取状态
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
// 从状态中获取配置信息
TableProcess tableProcess = broadcastState.get(key);
if (tableProcess != null){
// 在配置表中找到了该操作对应的配置
// 判断是事实数据还是维度数据
String sinkTable = tableProcess.getSinkTable();
jsonObj.put("sink_table",sinkTable);
String sinkType = tableProcess.getSinkType();
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
// 维度数据,放到维度侧输出流中
ctx.output(dimTag,jsonObj);
}else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
// 事实数据,放到主流中
out.collect(jsonObj);
}
}else {
// 在配置表中没有该操作对应的配置
System.out.println("No This Key In TableProcess:" + key);
}
}
(2)测试
在base_trademark中添加一条数据,观察输出结果。
在order_info中修改一条数据,观察输出结果。
6 完整代码
(1)主程序
public class BaseDBApp {
public static void main(String[] args) throws Exception {
//TODO 1 基本环境准备
//流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(4);
// //TODO 2 检查点设置
// //开启检查点
// env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
// // 设置检查点超时时间
// env.getCheckpointConfig().setCheckpointTimeout(60000L);
// // 设置重启策略
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
// // 设置job取消后,检查点是否保留
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
// // 指定操作HDFS的用户
// System.setProperty("HADOOP_USER_NAME","hzy");
//TODO 3 从kafka中读取数据
//声明消费的主题以及消费者组
String topic = "ods_base_db_m";
String groupId = "base_db_app_group";
// 获取消费者对象
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
// 读取数据,封装成流
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//TODO 4 对数据类型进行转换 String -> JSONObject
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);
//TODO 5 简单的ETL
SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject jsonobj) throws Exception {
boolean flag =
jsonobj.getString("table") != null &&
jsonobj.getString("table").length() > 0 &&
jsonobj.getJSONObject("data") != null &&
jsonobj.getString("data").length() > 3;
return flag;
}
}
);
// filterDS.print("<<<");
//TODO 6 使用FlinkCDC读取配置表数据
//获取dataSource
Properties props = new Properties();
props.setProperty("scan.startup.mode","initial");
SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder()
.hostname("hadoop101")
.port(3306)
.username("root")
.password("123456")
// 可配置多个库
.databaseList("gmall2022_realtime")
///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
//注意:指定的时候需要使用"db.table"的方式
.tableList("gmall2022_realtime.table_process")
.debeziumProperties(props)
.deserializer(new MyDeserializationSchemaFunction())
.build();
// 读取数据封装流
DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);
// 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去
// 想要使用广播状态,状态描述器只能是map,使用map状态存储
MapStateDescriptor<String, TableProcess> mapStateDescriptor =
new MapStateDescriptor<>("table_process", String.class, TableProcess.class);
BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);
// 调用非广播流的connect方法,将业务流与配置流进行连接
BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);
//TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中
//声明维度侧输出流的标记
OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {};
SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
new TableProcessFunction(dimTag,mapStateDescriptor)
);
// 获取维度侧输出流
DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);
realDS.print("事实数据:");
dimDS.print("维度数据:");
//TODO 8 将维度侧输出流的数据写到Hbase中
//TODO 9 将主流数据写回kafka的dwd层
env.execute();
}
}
(2)TableProcessFunction
/**
* 实现动态分流
* 目前流中有两条流中的数据,使用以下两个方法分别进行处理
*/
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
// 声明维度侧输出流标签
private OutputTag<JSONObject> dimTag;
// 声明广播状态描述器
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.dimTag = dimTag;
this.mapStateDescriptor = mapStateDescriptor;
}
// 处理业务流中的数据,maxwell从业务数据库中采集到的数据
@Override
public void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
String table = jsonObj.getString("table");
String type = jsonObj.getString("type");
// 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insert
if (type.equals("bootstrap-insert")){
type = "insert";
jsonObj.put("type",type);
}
// 拼接key
String key = table + ":" + type;
// 获取状态
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
// 从状态中获取配置信息
TableProcess tableProcess = broadcastState.get(key);
if (tableProcess != null){
// 在配置表中找到了该操作对应的配置
// 判断是事实数据还是维度数据
String sinkTable = tableProcess.getSinkTable();
jsonObj.put("sink_table",sinkTable);
String sinkType = tableProcess.getSinkType();
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
// 维度数据,放到维度侧输出流中
ctx.output(dimTag,jsonObj);
}else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
// 事实数据,放到主流中
out.collect(jsonObj);
}
}else {
// 在配置表中没有该操作对应的配置
System.out.println("No This Key In TableProcess:" + key);
}
}
// 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息
@Override
public void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
// 获取状态
BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
// 将json格式字符串转换为JSON对象
JSONObject jsonObj = JSONObject.parseObject(jsonStr);
// 获取配置表中的一条配置信息
// parseObject:将json格式字符串转化为json格式对象
// 第二个参数为将json字符串转化为何种格式的对象
TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);
// 业务数据库表名
String sourceTable = tableProcess.getSourceTable();
// 操作类型
String operateType = tableProcess.getOperateType();
// 数据类型 hbase -- 维度数据 kafka -- 事实数据
String sinkType = tableProcess.getSinkType();
// 指定输出目的地
String sinkTable = tableProcess.getSinkTable();
// 主键
String sinkPk = tableProcess.getSinkPk();
// 指定保留字段(列)
String sinkColumns = tableProcess.getSinkColumns();
// 指定建表扩展语句
String sinkExtend = tableProcess.getSinkExtend();
// 将配置信息放到状态中
// 拼接key
String key = sourceTable + ":" + operateType;
broadcastState.put(key,tableProcess);
}
}
(3)MyDeserializationSchemaFunction
public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {
// 反序列化
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 导入的是org.apache.kafka.connnect.data包
Struct valueStruct = (Struct) sourceRecord.value();
// 获取数据的来源
Struct afterStruct = valueStruct.getStruct("after");
// 获取数据库和表名的来源
Struct sourceStruct = valueStruct.getStruct("source");
// 获取数据库
String database = sourceStruct.getString("db");
// 获取表名
String table = sourceStruct.getString("table");
// 获取操作类型
// String op = valueStruct.getString("op");
String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
if(type.equals("create")){
type = "insert";
}
JSONObject jsonObj = new JSONObject();
jsonObj.put("database",database);
jsonObj.put("table",table);
jsonObj.put("type",type);
// 获取影响的数据
// 删除时,afterStruct为空
JSONObject dataJsonObj = new JSONObject();
if (afterStruct != null){
// schema获取源数据的格式,fields获取里面的各个元素
for (Field field : afterStruct.schema().fields()) {
String fieldName = field.name();
Object fieldValue = afterStruct.get(field);
dataJsonObj.put(fieldName,fieldValue);
}
}
// 删除操作会使得data属性不为空,但size为0
jsonObj.put("data",dataJsonObj);
// 向下游发送数据
collector.collect(jsonObj.toJSONString()) ;
}
// 指定类型
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
(4)总结
目前已完成功能:
- 基本环境准备
- 读取主流业务数据
- 使用CDC读取配置表中数据
- 将读取的配置数据流转换为广播流
- 主流和广播流进行连接
- 对连接之后的流进行处理(动态分流 – TableProcessFunction)
- 处理广播流的方法 – processBroadcastElement
- 获取状态
- 从广播流中获取配置信息,将配置信息放到状态中
- 处理主流的方法 – processElement
- 获取状态
- 从主流中获取表以及操作类型,根据表和操作类型封装key
- 通过key到状态中获取配置对象
- 根据配置对象的sinkType进行分流
- 处理广播流的方法 – processBroadcastElement