1 概述
DIM层设计要点:
(1)DIM层的设计依据是维度建模理论,该层存储维度模型的维度表。
(2)DIM层的数据存储在 HBase 表中DIM 层表是用于维度关联的,要通过主键去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,因而选用 HBase 存储维度数据。
(3)DIM层表名的命名规范为dim_表名
2 环境
3 方案
4 配置表
本层的任务是将业务数据直接写入到不同的 HBase 表中。那么如何让程序知道流中的哪些数据是维度数据?维度数据又应该写到 HBase 的哪些表中?为了解决这个问题,我们选择在 MySQL 中构建一张配置表,通过 Flink CDC 将配置表信息读取到程序中。
实现:在mysql中创建数据库rosh_config 并开启binlog日志,创建表table_process
CREATE TABLE `table_process` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`source_table` varchar(200) DEFAULT NULL COMMENT '来源表',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表拓展',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5 FlinkCDC数据格式
测试代码:
public class FlinkCDCTest {
public static void main(String[] args) throws Exception {
//获取执行环境,生产环境中设置成kafka主题的分区数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//获取flinkCDC格式
MySqlSource<String> mySqlSource = MySqlSource
.<String>builder().hostname(MySqlConstant.HOST_NAME)
.port(MySqlConstant.PORT)
.username(MySqlConstant.USER_NAME)
.password(MySqlConstant.PASSWORD)
.databaseList("rosh_config")
.tableList("rosh_config.table_process")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");
mysqlSourceDS.print(">>>>>>>>>>");
env.execute();
}
}
数据收集:
#启动时读到的数据
{"before":null,"after":{"id":25,"source_table":"base_trademark","sink_table":"dim_base_trademark","sink_columns":"id,tm_name","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674027465674,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1674027465678,"transaction":null}
#创建数据时读到的数据
INSERT INTO `rosh_config`.`table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('test_insert', 'test_insert', NULL, NULL, NULL);
{"before":null,"after":{"id":27,"source_table":"test_insert","sink_table":"test_insert","sink_columns":null,"sink_pk":null,"sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674027661000,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":23651,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1674027661432,"transaction":null}
#修改数据
UPDATE table_process SET source_table = 'test_update' , sink_table = 'test_update' WHERE id = 27
{"before":{"id":27,"source_table":"test_insert","sink_table":"test_insert","sink_columns":null,"sink_pk":null,"sink_extend":null},"after":{"id":27,"source_table":"test_update","sink_table":"test_update","sink_columns":null,"sink_pk":null,"sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674027834000,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":23974,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1674027835147,"transaction":null}
#删除
delete from table_process where id = 27
{"before":{"id":27,"source_table":"test_update","sink_table":"test_update","sink_columns":null,"sink_pk":null,"sink_extend":null},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674027889000,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":24333,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1674027889604,"transaction":null}
数据分析:
启动时读到的config数据、新增数据时 before字段为空,当before为空时,需要调用 create table if not exists
当before和after都不为空时,是修改数据,规定不能执行修改操作,及不做处理,也调用 create table if not exists(用phoenix修改表太麻烦)
当before不为空时,after为空时,需要删除表delete table
6 Maxwell 收集业务数据格式
#当前不考虑删除,删除不影响维表的查询
#启动kafak消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic db_topic
#全量同步
bin/maxwell-bootstrap --database rosh_mall --table base_region --config ./config.properties
{"database":"rosh_mall","table":"base_region","type":"bootstrap-start","ts":1674030980,"data":{}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"1","region_name":"华北"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"2","region_name":"华东"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"3","region_name":"东北"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"4","region_name":"华中"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"5","region_name":"华南"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"6","region_name":"西南"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"7","region_name":"西北"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-complete","ts":1674030980,"data":{}}
#新增数据
{"database":"rosh_mall","table":"base_region","type":"insert","ts":1674031009,"xid":40967,"commit":true,"data":{"id":"10","region_name":"test"}}
#修改
{"database":"rosh_mall","table":"base_region","type":"update","ts":1674031027,"xid":41011,"commit":true,"data":{"id":"10","region_name":"update"},"old":{"id":null,"region_name":"test"}}
#结论
当type为bootstrap-start、insert、update时需要更新维表数据
7 使用Phoenix创建库
#使用Phoenix创建库
cd /usr/local/phoenix-hbase/bin
./sqlline.py
create schema ROSH_MALL;
8 Code 实现
8.1 工程
8.2 pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rosh-mall-reatime</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink cdc-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<!-- 如果不引入 flink-table 相关依赖,则会报错:Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter引入以下依赖可以解决这个问题(引入某些其它的 flink-table相关依赖也可)-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--如果保存检查点到hdfs上,需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!--phoenix-->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--工具包-->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
8.3 utils
package com.rosh.mall.utils;
import com.alibaba.druid.pool.DruidDataSource;
import com.rosh.mall.constant.PhoenixConstant;
public class DruidPhoenixUtils {
private DruidPhoenixUtils() {
}
private static DruidDataSource druidDataSource;
static {
initialDataSource();
}
private static void initialDataSource() {
//创建
druidDataSource = new DruidDataSource();
// 设置驱动全类名
druidDataSource.setDriverClassName(PhoenixConstant.PHOENIX_DRIVER);
// 设置连接 url
druidDataSource.setUrl(PhoenixConstant.PHOENIX_SERVER);
// 设置初始化连接池时池中连接的数量
druidDataSource.setInitialSize(5);
// 设置同时活跃的最大连接数
druidDataSource.setMaxActive(20);
// 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
druidDataSource.setMinIdle(1);
// 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
druidDataSource.setMaxWait(-1);
// 验证连接是否可用使用的 SQL 语句
druidDataSource.setValidationQuery("select 1");
// 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
// 注意,默认值为 true,如果没有设置 validationQuery,则报错
// testWhileIdle is true, validationQuery not set
druidDataSource.setTestWhileIdle(true);
// 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
druidDataSource.setTestOnBorrow(false);
// 归还连接时,是否测试
druidDataSource.setTestOnReturn(false);
// 设置空闲连接回收器每隔 30s 运行一次
druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
// 设置池中连接空闲 30min 被回收,默认值即为 30 min
druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);
}
public static DruidDataSource getDruidDataSource() {
return druidDataSource;
}
}
package com.rosh.mall.utils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class EnviromentUtils {
private EnviromentUtils() {
}
/**
* 设置checkpoint
*/
public static void setCheckpoint(StreamExecutionEnvironment env) {
//开启checkpoint,5分钟开启一次
env.enableCheckpointing(5 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
//设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop1:8020/flink/ck");
//设置超时时间 10分钟
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000L);
//设置checkpoints最多为2次
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//总共尝试3次重启,每个5秒重试一次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5 * 1000L));
//设置用户
System.setProperty("HADOOP_USER_NAME", "root");
}
}
package com.rosh.mall.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
public class KafkaUtils {
private KafkaUtils() {
}
public static final String DB_TOPIC = "db_topic";
public static final String GROUP_ID = "dim_app_group";
private static final String KAFKA_SERVER = "hadoop2:9092";
public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaConsumer<>(topic,
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String s) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record == null || record.value() == null) {
return "";
} else {
return new String(record.value());
}
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
},
properties);
}
}
package com.rosh.mall.utils;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.rosh.mall.constant.PhoenixConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Set;
@Slf4j
public class PhoenixUtil {
private PhoenixUtil() {
}
/**
* phoennix 写入数据
*
* @param connection 连接
* @param sinkTable 表名
* @param data 数据
*/
public static void upsertValues(DruidPooledConnection connection, String sinkTable, JSONObject data) throws SQLException {
// 拼接sql语句
// upsert into db.tn(id,name,sex) values('1','zhangsan','male')
Set<String> columns = data.keySet();
Collection<Object> values = data.values();
String sqlSb = "upsert into " + PhoenixConstant.HBASE_SCHEMA + "." + sinkTable + "(" +
StringUtils.join(columns, ",") + ") values ( '" +
StringUtils.join(values, "','") + "')";
//预编译
PreparedStatement preparedStatement = connection.prepareStatement(sqlSb);
preparedStatement.execute();
connection.commit();
//释放资源
preparedStatement.close();
}
}
8.4 constant
package com.rosh.mall.constant;
import java.util.Arrays;
import java.util.List;
public class MaxwellConstant {
private MaxwellConstant() {
}
public static final String TYPE = "type";
public static final String INSERT_TYPE = "insert";
public static final String UPDATE_TYPE = "update";
public static final String DELETE_TYPE = "delete";
public static final String INITIAL_TYPE = "bootstrap-insert";
}
package com.rosh.mall.constant;
public class MySqlConstant {
private MySqlConstant() {
}
public static final String HOST_NAME = "hadoop2";
public static final Integer PORT = 3306;
public static final String USER_NAME = "root";
public static final String PASSWORD = "123456";
}
package com.rosh.mall.constant;
public class PhoenixConstant {
private PhoenixConstant() {
}
/**
* Phoenix 库名称
*/
public static final String HBASE_SCHEMA = "ROSH_MALL";
/**
* Phoenix 驱动
*/
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
/**
* Phoenix 连接参数
*/
public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop2:2181";
}
8.5 bean
package com.rosh.mall.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class TableProcess {
/**
* id
*/
private Long id;
/**
* 来源表
*/
private String sourceTable;
/**
* 输出表
*/
private String sinkTable;
/**
* 输出字段
*/
private String sinkColumns;
/**
* 主键字段
*/
private String sinkPk;
/**
* 建表扩展
*/
private String sinkExtend;
}
8.6 func
package com.rosh.mall.app.func;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.rosh.mall.utils.DruidPhoenixUtils;
import com.rosh.mall.utils.PhoenixUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@Slf4j
public class DimSinkFunction extends RichSinkFunction<JSONObject> {
/**
*
*/
@Override
public void invoke(JSONObject jsonObject, Context context) throws Exception {
//获取连接
DruidPooledConnection connection = DruidPhoenixUtils.getDruidDataSource().getConnection();
//写出数据
String sinkTable = jsonObject.getString("sinkTable");
JSONObject data = jsonObject.getJSONObject("data");
PhoenixUtil.upsertValues(connection, sinkTable, data);
//归还连接
connection.close();
}
}
package com.rosh.mall.app.func;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rosh.mall.bean.TableProcess;
import com.rosh.mall.constant.PhoenixConstant;
import com.rosh.mall.utils.DruidPhoenixUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Slf4j
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private final MapStateDescriptor<String, TableProcess> mapStateDescriptor;
public TableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.mapStateDescriptor = mapStateDescriptor;
}
/**
* 主流
*/
@Override
public void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext context, Collector<JSONObject> collector) throws Exception {
//获取广播状态
ReadOnlyBroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
String table = jsonObject.getString("table");
TableProcess tableProcess = broadcastState.get(table);
//如果当前表是维表数据
if (tableProcess != null) {
//过滤字段
filterColumn(jsonObject.getJSONObject("data"), tableProcess.getSinkColumns());
//补充sinkTable并写出到流中
jsonObject.put("sinkTable", tableProcess.getSinkTable());
collector.collect(jsonObject);
}
}
private void filterColumn(JSONObject data, String sinkColumns) {
List<String> filedList = Arrays.asList(sinkColumns.split(","));
Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> map = iterator.next();
if (!filedList.contains(map.getKey())) {
iterator.remove();
}
}
}
/**
* 广播流
* 数据格式:{"before":null,"after":{"id":1,"source_table":"activity_info","sink_table":"dim_activity_info","sink_columns":"id,activity_name,activity_type,activity_desc,start_time,end_time,create_time","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674004205649,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1674004205653,"transaction":null}
*/
@Override
public void processBroadcastElement(String value, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {
//获取并解析数据
JSONObject jsonObject = JSON.parseObject(value);
String after = jsonObject.getString("after");
String before = jsonObject.getString("before");
if (StringUtils.isNotBlank(after)) {
//创建、初始化调用create table if not exists
TableProcess tableProcess = JSON.parseObject(after, TableProcess.class);
//校验并建表
checkTable(tableProcess);
//写入状态广播出去
BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
broadcastState.put(tableProcess.getSourceTable(), tableProcess);
} else {
//调用删除
TableProcess tableProcess = JSON.parseObject(before, TableProcess.class);
//删除表
deleteTable(tableProcess);
//删除广播状态
BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
broadcastState.remove(tableProcess.getSourceTable());
}
}
private void deleteTable(TableProcess tableProcess) {
String sinkTable = tableProcess.getSinkTable();
String deleteSql = "drop table " + PhoenixConstant.HBASE_SCHEMA + "." + sinkTable;
log.info("TableProcessFunction invoke deleteTable deleteSql:{}", deleteSql);
PreparedStatement preparedStatement = null;
DruidPooledConnection connection = null;
try {
//获取连接
connection = DruidPhoenixUtils.getDruidDataSource().getConnection();
//执行sql
preparedStatement = connection.prepareStatement(deleteSql);
preparedStatement.execute();
//归还连接
connection.close();
} catch (SQLException e) {
log.info("TableProcessFunction invoke deleteTable failed ", e);
throw new RuntimeException(e);
} finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
private void checkTable(TableProcess tableProcess) {
//处理特殊字段
String sinkPk = tableProcess.getSinkPk();
String sinkExtend = tableProcess.getSinkExtend();
if (StringUtils.isBlank(sinkPk)) {
sinkPk = "id";
}
if (StringUtils.isBlank(sinkExtend)) {
sinkExtend = "";
}
//拼接sql create table if not exists db.tn(id varchar primary key,bb varchar,cc varchar)
StringBuilder sqlSb = new StringBuilder("create table if not exists ")
.append(PhoenixConstant.HBASE_SCHEMA)
.append(".")
.append(tableProcess.getSinkTable())
.append("(");
String sinkColumns = tableProcess.getSinkColumns();
String[] columns = sinkColumns.split(",");
for (int i = 0; i < columns.length; i++) {
//取出字段
String column = columns[i];
if (sinkPk.equals(column)) {
sqlSb.append(column).append(" varchar primary key");
} else {
sqlSb.append(column).append(" varchar");
}
if (i == columns.length - 1) {
sqlSb.append(")");
} else {
sqlSb.append(",");
}
}
sqlSb.append(sinkExtend);
log.info("TableProcessFunction invoke checkTable sql:{}", sqlSb);
//编译、建表、释放资源
PreparedStatement preparedStatement = null;
try {
//获取连接
DruidPooledConnection connection = DruidPhoenixUtils.getDruidDataSource().getConnection();
//建表
preparedStatement = connection.prepareStatement(sqlSb.toString());
preparedStatement.execute();
//归还连接
connection.close();
} catch (Exception e) {
log.info("TableProcessFunction invoke checkTable failed ", e);
throw new RuntimeException("建表失败");
} finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
8.7 DimApp
package com.rosh.mall.app.dim;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rosh.mall.app.func.DimSinkFunction;
import com.rosh.mall.app.func.TableProcessFunction;
import com.rosh.mall.bean.TableProcess;
import com.rosh.mall.constant.MaxwellConstant;
import com.rosh.mall.constant.MySqlConstant;
import com.rosh.mall.utils.EnviromentUtils;
import com.rosh.mall.utils.KafkaUtils;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
@Slf4j
public class DimApp {
public static void main(String[] args) throws Exception {
//获取执行环境,生产环境中设置成kafka主题的分区数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//开启checkpoint,设置状态后端,生产环境开启,测试环境关闭
//EnviromentUtils.setCheckpoint(env);
//读取kafka db_topic 创建数据流
DataStreamSource<String> kafkaDS = env.addSource(KafkaUtils.getFlinkKafkaConsumer(KafkaUtils.DB_TOPIC, KafkaUtils.GROUP_ID));
//过滤数据
SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
@Override
public void flatMap(String str, Collector<JSONObject> out) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(str);
String type = jsonObject.getString("type");
if (MaxwellConstant.INSERT_TYPE.equals(type) || MaxwellConstant.UPDATE_TYPE.equals(type) || MaxwellConstant.INITIAL_TYPE.equals(type)) {
out.collect(jsonObject);
}
} catch (Exception e) {
log.info("DimApp flatmap 错误日志:{}", str);
}
}
});
//广播流
//创建FlinkCDC读取MySQL配置信息表创建配置
MySqlSource<String> mySqlSource = MySqlSource
.<String>builder().hostname(MySqlConstant.HOST_NAME)
.port(MySqlConstant.PORT)
.username(MySqlConstant.USER_NAME)
.password(MySqlConstant.PASSWORD)
.databaseList("rosh_config")
.tableList("rosh_config.table_process")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);
//连接主流广播流
BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjectDS.connect(broadcastStream);
//处理连接流,根据配置信息处理主流数据
SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));
//将数据写入Phoenix
dimDS.addSink(new DimSinkFunction());
//启动任务
env.execute("DimApp");
}
}
9 测试
9.1 给配置表新增数据
INSERT INTO `rosh_config`.`table_process`( `source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ( 'base_trademark', 'dim_base_trademark', 'id,tm_name', 'id', NULL);
查看维表:
9.2 业务库新增数据
INSERT INTO `rosh_mall`.`base_trademark` ( `id`, `tm_name`, `logo_url` ) VALUES ( 14, 'rosh测试', '/static/default.jpg' );
INSERT INTO `rosh_mall`.`base_trademark` ( `id`, `tm_name`, `logo_url` ) VALUES ( 15, 'rosh测试1', '/static/default.jpg' );
查看维表:
select * from ROSH_MALL.DIM_BASE_TRADEMARK;
9.3 删除配置库数据
delete from table_process where id = 29
查看维表: