Flink 实时计算DIM层实现方案

news2024/9/27 23:24:03

1 概述

DIM层设计要点:
(1)DIM层的设计依据是维度建模理论,该层存储维度模型的维度表。
(2)DIM层的数据存储在 HBase 表中DIM 层表是用于维度关联的,要通过主键去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,因而选用 HBase 存储维度数据。
(3)DIM层表名的命名规范为dim_表名

2 环境

在这里插入图片描述

3 方案

在这里插入图片描述

4 配置表

本层的任务是将业务数据直接写入到不同的 HBase 表中。那么如何让程序知道流中的哪些数据是维度数据?维度数据又应该写到 HBase 的哪些表中?为了解决这个问题,我们选择在 MySQL 中构建一张配置表,通过 Flink CDC 将配置表信息读取到程序中。
实现:在mysql中创建数据库rosh_config 并开启binlog日志,创建表table_process

CREATE TABLE `table_process` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `source_table` varchar(200) DEFAULT NULL COMMENT '来源表',
  `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
  `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
  `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
  `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表拓展',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

在这里插入图片描述

5 FlinkCDC数据格式

测试代码:

public class FlinkCDCTest {

    public static void main(String[] args) throws Exception {

        //获取执行环境,生产环境中设置成kafka主题的分区数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //获取flinkCDC格式
        MySqlSource<String> mySqlSource = MySqlSource
                .<String>builder().hostname(MySqlConstant.HOST_NAME)
                .port(MySqlConstant.PORT)
                .username(MySqlConstant.USER_NAME)
                .password(MySqlConstant.PASSWORD)
                .databaseList("rosh_config")
                .tableList("rosh_config.table_process")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");

        mysqlSourceDS.print(">>>>>>>>>>");

        env.execute();

    }
} 

数据收集:


#启动时读到的数据
{"before":null,"after":{"id":25,"source_table":"base_trademark","sink_table":"dim_base_trademark","sink_columns":"id,tm_name","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674027465674,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1674027465678,"transaction":null}

#创建数据时读到的数据
INSERT INTO `rosh_config`.`table_process`(`source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('test_insert', 'test_insert', NULL, NULL, NULL);
{"before":null,"after":{"id":27,"source_table":"test_insert","sink_table":"test_insert","sink_columns":null,"sink_pk":null,"sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674027661000,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":23651,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1674027661432,"transaction":null}

#修改数据
UPDATE table_process SET source_table = 'test_update' , sink_table = 'test_update' WHERE id = 27
{"before":{"id":27,"source_table":"test_insert","sink_table":"test_insert","sink_columns":null,"sink_pk":null,"sink_extend":null},"after":{"id":27,"source_table":"test_update","sink_table":"test_update","sink_columns":null,"sink_pk":null,"sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674027834000,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":23974,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1674027835147,"transaction":null}

#删除
delete from table_process where id = 27
{"before":{"id":27,"source_table":"test_update","sink_table":"test_update","sink_columns":null,"sink_pk":null,"sink_extend":null},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674027889000,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":24333,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1674027889604,"transaction":null}

数据分析:

启动时读到的config数据、新增数据时 before字段为空,当before为空时,需要调用 create table if not exists
当before和after都不为空时,是修改数据,规定不能执行修改操作,及不做处理,也调用 create table if not exists(用phoenix修改表太麻烦)
当before不为空时,after为空时,需要删除表delete table

6 Maxwell 收集业务数据格式


#当前不考虑删除,删除不影响维表的查询


#启动kafak消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic db_topic


#全量同步
bin/maxwell-bootstrap --database rosh_mall --table base_region --config ./config.properties

{"database":"rosh_mall","table":"base_region","type":"bootstrap-start","ts":1674030980,"data":{}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"1","region_name":"华北"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"2","region_name":"华东"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"3","region_name":"东北"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"4","region_name":"华中"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"5","region_name":"华南"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"6","region_name":"西南"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-insert","ts":1674030980,"data":{"id":"7","region_name":"西北"}}
{"database":"rosh_mall","table":"base_region","type":"bootstrap-complete","ts":1674030980,"data":{}}


#新增数据
{"database":"rosh_mall","table":"base_region","type":"insert","ts":1674031009,"xid":40967,"commit":true,"data":{"id":"10","region_name":"test"}}

#修改
{"database":"rosh_mall","table":"base_region","type":"update","ts":1674031027,"xid":41011,"commit":true,"data":{"id":"10","region_name":"update"},"old":{"id":null,"region_name":"test"}}



#结论
当type为bootstrap-start、insert、update时需要更新维表数据

7 使用Phoenix创建库

#使用Phoenix创建库

cd /usr/local/phoenix-hbase/bin
./sqlline.py

create schema ROSH_MALL;

8 Code 实现

8.1 工程

在这里插入图片描述

8.2 pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rosh-mall-reatime</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--flink cdc-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.1.0</version>
        </dependency>

        <!-- 如果不引入 flink-table 相关依赖,则会报错:Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter引入以下依赖可以解决这个问题(引入某些其它的 flink-table相关依赖也可)-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--如果保存检查点到hdfs上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>


        <!--phoenix-->
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>5.0.0-HBase-2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--工具包-->
        <dependency>
            <groupId>commons-fileupload</groupId>
            <artifactId>commons-fileupload</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.10</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.16</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

8.3 utils

package com.rosh.mall.utils;

import com.alibaba.druid.pool.DruidDataSource;
import com.rosh.mall.constant.PhoenixConstant;

public class DruidPhoenixUtils {


    private DruidPhoenixUtils() {

    }

    private static DruidDataSource druidDataSource;

    static {
        initialDataSource();
    }


    private static void initialDataSource() {
        //创建
        druidDataSource = new DruidDataSource();
        // 设置驱动全类名
        druidDataSource.setDriverClassName(PhoenixConstant.PHOENIX_DRIVER);
        // 设置连接 url
        druidDataSource.setUrl(PhoenixConstant.PHOENIX_SERVER);
        // 设置初始化连接池时池中连接的数量
        druidDataSource.setInitialSize(5);
        // 设置同时活跃的最大连接数
        druidDataSource.setMaxActive(20);
        // 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
        druidDataSource.setMinIdle(1);
        // 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
        druidDataSource.setMaxWait(-1);
        // 验证连接是否可用使用的 SQL 语句
        druidDataSource.setValidationQuery("select 1");
        // 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
        // 注意,默认值为 true,如果没有设置 validationQuery,则报错
        // testWhileIdle is true, validationQuery not set
        druidDataSource.setTestWhileIdle(true);
        // 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
        druidDataSource.setTestOnBorrow(false);
        // 归还连接时,是否测试
        druidDataSource.setTestOnReturn(false);
        // 设置空闲连接回收器每隔 30s 运行一次
        druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
        // 设置池中连接空闲 30min 被回收,默认值即为 30 min
        druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);
    }

    public static DruidDataSource getDruidDataSource() {
        return druidDataSource;
    }


}
package com.rosh.mall.utils;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class EnviromentUtils {

    private EnviromentUtils() {

    }


    /**
     * 设置checkpoint
     */
    public static void setCheckpoint(StreamExecutionEnvironment env) {

        //开启checkpoint,5分钟开启一次
        env.enableCheckpointing(5 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE);
        //设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop1:8020/flink/ck");
        //设置超时时间 10分钟
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000L);
        //设置checkpoints最多为2次
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //总共尝试3次重启,每个5秒重试一次
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5 * 1000L));

        //设置用户
        System.setProperty("HADOOP_USER_NAME", "root");

    }


}

package com.rosh.mall.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Properties;

public class KafkaUtils {

    private KafkaUtils() {

    }

    public static final String DB_TOPIC = "db_topic";

    public static final String GROUP_ID = "dim_app_group";
    private static final String KAFKA_SERVER = "hadoop2:9092";


    public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        return new FlinkKafkaConsumer<>(topic,
                new KafkaDeserializationSchema<String>() {
                    @Override
                    public boolean isEndOfStream(String s) {
                        return false;
                    }

                    @Override
                    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
                        if (record == null || record.value() == null) {
                            return "";
                        } else {
                            return new String(record.value());
                        }
                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                },
                properties);

    }


}

package com.rosh.mall.utils;

import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.rosh.mall.constant.PhoenixConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Set;

@Slf4j
public class PhoenixUtil {

    private PhoenixUtil() {

    }

    /**
     * phoennix 写入数据
     *
     * @param connection 连接
     * @param sinkTable  表名
     * @param data       数据
     */
    public static void upsertValues(DruidPooledConnection connection, String sinkTable, JSONObject data) throws SQLException {


        // 拼接sql语句
        // upsert into db.tn(id,name,sex) values('1','zhangsan','male')
        Set<String> columns = data.keySet();
        Collection<Object> values = data.values();

        String sqlSb = "upsert into " + PhoenixConstant.HBASE_SCHEMA + "." + sinkTable + "(" +
                StringUtils.join(columns, ",") + ") values ( '" +
                StringUtils.join(values, "','") + "')";



        //预编译
        PreparedStatement preparedStatement = connection.prepareStatement(sqlSb);
        preparedStatement.execute();
        connection.commit();
        //释放资源
        preparedStatement.close();


    }

}

8.4 constant

package com.rosh.mall.constant;

import java.util.Arrays;
import java.util.List;

public class MaxwellConstant {

    private MaxwellConstant() {

    }

    public static final String TYPE = "type";

    public static final String INSERT_TYPE = "insert";

    public static final String UPDATE_TYPE = "update";

    public static final String DELETE_TYPE = "delete";

    public static final String INITIAL_TYPE = "bootstrap-insert";



}

package com.rosh.mall.constant;

public class MySqlConstant {

    private MySqlConstant() {

    }


    public static final String HOST_NAME = "hadoop2";

    public static final Integer PORT = 3306;

    public static final String USER_NAME = "root";

    public static final String PASSWORD = "123456";



}

package com.rosh.mall.constant;


public class PhoenixConstant {

    private PhoenixConstant() {

    }

    /**
     * Phoenix 库名称
     */
    public static final String HBASE_SCHEMA = "ROSH_MALL";

    /**
     * Phoenix 驱动
     */
    public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";

    /**
     * Phoenix 连接参数
     */
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop2:2181";

}

8.5 bean

package com.rosh.mall.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
public class TableProcess {


    /**
     * id
     */
    private Long id;

    /**
     * 来源表
     */
    private String sourceTable;

    /**
     * 输出表
     */
    private String sinkTable;

    /**
     * 输出字段
     */
    private String sinkColumns;

    /**
     * 主键字段
     */
    private String sinkPk;
    /**
     * 建表扩展
     */
    private String sinkExtend;

}

8.6 func

package com.rosh.mall.app.func;

import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.rosh.mall.utils.DruidPhoenixUtils;
import com.rosh.mall.utils.PhoenixUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


@Slf4j
public class DimSinkFunction extends RichSinkFunction<JSONObject> {

    /**
     *
     */
    @Override
    public void invoke(JSONObject jsonObject, Context context) throws Exception {
        //获取连接
        DruidPooledConnection connection = DruidPhoenixUtils.getDruidDataSource().getConnection();
        //写出数据
        String sinkTable = jsonObject.getString("sinkTable");
        JSONObject data = jsonObject.getJSONObject("data");
        PhoenixUtil.upsertValues(connection, sinkTable, data);
        //归还连接
        connection.close();
    }
}

package com.rosh.mall.app.func;

import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rosh.mall.bean.TableProcess;
import com.rosh.mall.constant.PhoenixConstant;
import com.rosh.mall.utils.DruidPhoenixUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

@Slf4j
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {


    private final MapStateDescriptor<String, TableProcess> mapStateDescriptor;

    public TableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.mapStateDescriptor = mapStateDescriptor;
    }

    /**
     * 主流
     */
    @Override
    public void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext context, Collector<JSONObject> collector) throws Exception {
        //获取广播状态
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
        String table = jsonObject.getString("table");
        TableProcess tableProcess = broadcastState.get(table);
        //如果当前表是维表数据
        if (tableProcess != null) {
            //过滤字段
            filterColumn(jsonObject.getJSONObject("data"), tableProcess.getSinkColumns());
            //补充sinkTable并写出到流中
            jsonObject.put("sinkTable", tableProcess.getSinkTable());
            collector.collect(jsonObject);
        }

    }

    private void filterColumn(JSONObject data, String sinkColumns) {
        List<String> filedList = Arrays.asList(sinkColumns.split(","));
        Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, Object> map = iterator.next();
            if (!filedList.contains(map.getKey())) {
                iterator.remove();
            }
        }
    }

    /**
     * 广播流
     * 数据格式:{"before":null,"after":{"id":1,"source_table":"activity_info","sink_table":"dim_activity_info","sink_columns":"id,activity_name,activity_type,activity_desc,start_time,end_time,create_time","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1674004205649,"snapshot":"false","db":"rosh_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1674004205653,"transaction":null}
     */
    @Override
    public void processBroadcastElement(String value, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context context, Collector<JSONObject> collector) throws Exception {

        //获取并解析数据
        JSONObject jsonObject = JSON.parseObject(value);
        String after = jsonObject.getString("after");
        String before = jsonObject.getString("before");
        if (StringUtils.isNotBlank(after)) {
            //创建、初始化调用create table if not exists
            TableProcess tableProcess = JSON.parseObject(after, TableProcess.class);
            //校验并建表
            checkTable(tableProcess);
            //写入状态广播出去
            BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
            broadcastState.put(tableProcess.getSourceTable(), tableProcess);
        } else {
            //调用删除
            TableProcess tableProcess = JSON.parseObject(before, TableProcess.class);
            //删除表
            deleteTable(tableProcess);
            //删除广播状态
            BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
            broadcastState.remove(tableProcess.getSourceTable());
        }

    }

    private void deleteTable(TableProcess tableProcess) {

        String sinkTable = tableProcess.getSinkTable();
        String deleteSql = "drop table " + PhoenixConstant.HBASE_SCHEMA + "." + sinkTable;
        log.info("TableProcessFunction invoke deleteTable deleteSql:{}", deleteSql);

        PreparedStatement preparedStatement = null;
        DruidPooledConnection connection = null;
        try {
            //获取连接
            connection = DruidPhoenixUtils.getDruidDataSource().getConnection();
            //执行sql
            preparedStatement = connection.prepareStatement(deleteSql);
            preparedStatement.execute();
            //归还连接
            connection.close();
        } catch (SQLException e) {
            log.info("TableProcessFunction invoke deleteTable failed ", e);
            throw new RuntimeException(e);
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void checkTable(TableProcess tableProcess) {
        //处理特殊字段
        String sinkPk = tableProcess.getSinkPk();
        String sinkExtend = tableProcess.getSinkExtend();
        if (StringUtils.isBlank(sinkPk)) {
            sinkPk = "id";
        }
        if (StringUtils.isBlank(sinkExtend)) {
            sinkExtend = "";
        }
        //拼接sql create table if not exists db.tn(id varchar primary key,bb varchar,cc varchar)
        StringBuilder sqlSb = new StringBuilder("create table if not exists ")
                .append(PhoenixConstant.HBASE_SCHEMA)
                .append(".")
                .append(tableProcess.getSinkTable())
                .append("(");

        String sinkColumns = tableProcess.getSinkColumns();
        String[] columns = sinkColumns.split(",");
        for (int i = 0; i < columns.length; i++) {
            //取出字段
            String column = columns[i];
            if (sinkPk.equals(column)) {
                sqlSb.append(column).append(" varchar primary key");
            } else {
                sqlSb.append(column).append(" varchar");
            }
            if (i == columns.length - 1) {
                sqlSb.append(")");
            } else {
                sqlSb.append(",");
            }
        }
        sqlSb.append(sinkExtend);

        log.info("TableProcessFunction invoke checkTable sql:{}", sqlSb);

        //编译、建表、释放资源
        PreparedStatement preparedStatement = null;
        try {
            //获取连接
            DruidPooledConnection connection = DruidPhoenixUtils.getDruidDataSource().getConnection();
            //建表
            preparedStatement = connection.prepareStatement(sqlSb.toString());
            preparedStatement.execute();
            //归还连接
            connection.close();
        } catch (Exception e) {
            log.info("TableProcessFunction invoke checkTable failed ", e);
            throw new RuntimeException("建表失败");
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


}

8.7 DimApp

package com.rosh.mall.app.dim;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rosh.mall.app.func.DimSinkFunction;
import com.rosh.mall.app.func.TableProcessFunction;
import com.rosh.mall.bean.TableProcess;
import com.rosh.mall.constant.MaxwellConstant;
import com.rosh.mall.constant.MySqlConstant;
import com.rosh.mall.utils.EnviromentUtils;
import com.rosh.mall.utils.KafkaUtils;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

@Slf4j
public class DimApp {

    public static void main(String[] args) throws Exception {

        //获取执行环境,生产环境中设置成kafka主题的分区数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //开启checkpoint,设置状态后端,生产环境开启,测试环境关闭
        //EnviromentUtils.setCheckpoint(env);

        //读取kafka db_topic 创建数据流
        DataStreamSource<String> kafkaDS = env.addSource(KafkaUtils.getFlinkKafkaConsumer(KafkaUtils.DB_TOPIC, KafkaUtils.GROUP_ID));

        //过滤数据
        SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String str, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(str);
                    String type = jsonObject.getString("type");
                    if (MaxwellConstant.INSERT_TYPE.equals(type) || MaxwellConstant.UPDATE_TYPE.equals(type) || MaxwellConstant.INITIAL_TYPE.equals(type)) {
                        out.collect(jsonObject);
                    }
                } catch (Exception e) {
                    log.info("DimApp flatmap 错误日志:{}", str);
                }
            }
        });

        //广播流
        //创建FlinkCDC读取MySQL配置信息表创建配置
        MySqlSource<String> mySqlSource = MySqlSource
                .<String>builder().hostname(MySqlConstant.HOST_NAME)
                .port(MySqlConstant.PORT)
                .username(MySqlConstant.USER_NAME)
                .password(MySqlConstant.PASSWORD)
                .databaseList("rosh_config")
                .tableList("rosh_config.table_process")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);

        //连接主流广播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjectDS.connect(broadcastStream);

        //处理连接流,根据配置信息处理主流数据
        SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));

        //将数据写入Phoenix
        dimDS.addSink(new DimSinkFunction());

        //启动任务
        env.execute("DimApp");

    }

}

9 测试

9.1 给配置表新增数据

INSERT INTO `rosh_config`.`table_process`( `source_table`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ( 'base_trademark', 'dim_base_trademark', 'id,tm_name', 'id', NULL);

在这里插入图片描述
查看维表:
在这里插入图片描述

9.2 业务库新增数据

INSERT INTO `rosh_mall`.`base_trademark` ( `id`, `tm_name`, `logo_url` ) VALUES	( 14, 'rosh测试', '/static/default.jpg' );
INSERT INTO `rosh_mall`.`base_trademark` ( `id`, `tm_name`, `logo_url` ) VALUES	( 15, 'rosh测试1', '/static/default.jpg' );

查看维表:


select * from ROSH_MALL.DIM_BASE_TRADEMARK;

在这里插入图片描述

9.3 删除配置库数据

delete from table_process where id = 29

查看维表:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/170183.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

html2canvas移动端使用问题及解决

1、jsbridge重复调用问题现象&#xff1a;与移动端进行通信&#xff0c;通过<script>标签里的jsbridge.js来调用端上的接口&#xff0c;在调用接口之后&#xff0c;调用html2canvas来生成图片&#xff0c;发现刚才调用的接口又被调用了一次解决方案&#xff1a;在html2ca…

比YOLOv8还要强的YOLOv6 v3.0

论文地址&#xff1a;https://arxiv.org/pdf/2301.05586.pdf 开源地址&#xff1a;https://github.com/meituan/YOLOv6 YOLOv6 v3.0的主要贡献简述如下&#xff1a; 对检测器的Neck部件进行了翻新&#xff0c;引入BiC(Bi-directional Concatenation)提供更精确的定位信息&…

好看的vscode深色主题,搜索主题名称即可设置

1.watermelon-theme 西瓜颜色的主题&#xff0c;满满的夏天感&#xff0c;红色交替的温柔。 2.Kawaine Theme 好看的粉色系主题&#xff0c; 3. Feminine Color Theme 很适合女孩子的一个主题&#xff0c;好看如其名。 4.pinkFlower-theme 这个真的超粉&#xff0c;很好看的…

Python采集常用:谷歌浏览器驱动——Chromedriver 插件安装教程

人生苦短 &#xff0c;我用Python 趁放假&#xff0c;偷偷卷起来&#xff01;&#xff01;&#xff01; 我们经常要使用谷歌浏览器驱动&#xff0c; 今天分享下这个Chromedriver 插件的安装方法。 一、打开谷歌浏览器打开设置面板 二、查看当前谷歌浏览器版本号 三、点击插件…

计算机取证科普性基础

内容为美亚柏科公开课笔记。 1 Windows系统与取证 1.1 存储介质取证概述 专业名词 未分配空间文件残留区 物理大小逻辑大小文件大小物理大小-逻辑大小 临时文件 C:\Windows\Temp*.tmpC:\Documents and Settings\Username\Local Setting\Temporary Internet FilesC:\Documen…

接口超时分析

原文&#xff1a;接口突然超时&#xff01;&#xff01;&#xff01; 1、网络异常 1.1、网络抖动 经常上网的我们&#xff0c;肯定遇到过这样的场景&#xff1a;大多数情况下我们访问某个网站很快&#xff0c;但偶尔会出现网页一直转圈&#xff0c;加载不出来的情况。 有可…

centos7 升级 gcc 版本

查看动态库版本 strings /usr/lib64/libstdc.so.6 | grep CXXABI查找gcc生成的最新动态库 find / -name "libstdc.so*"方法一&#xff1a; 1、查看当前gcc版本 #默认4.8.5 g -v 或者 gcc --version2、下载gcc源码&#xff08;10.2.0&#xff09; wget https://f…

2022.12青少年软件编程(Python)等级考试试卷(六级)

2022.12.10青少年软件编程(Python)等级考试试卷(六级) 一、单选题(共25题,共50分) 1.数据文件“abc.txt”中包含若干个英文单词,如图所示: 读取文件“abc.txt”中数据的Python程序段如下: file = abc.txt word_b = [] for word in open(file): if w…

spring boot支持https请求(建议收藏)

文章目录前言一、借助keytools二、详细步骤三、配置spring项目支持https总结前言 博主个人社区&#xff1a;开发与算法学习社区 博主个人主页&#xff1a;Killing Vibe的博客 欢迎大家加入&#xff0c;一起交流学习~~ 众所周知&#xff0c;http是不安全的协议&#xff0c;那么要…

OC/Swift 技术 链接跳转外置(内置)Safari/加载网页(源码)

一直觉得自己写的不是技术&#xff0c;而是情怀&#xff0c;一个个的教程是自己这一路走来的痕迹。靠专业技能的成功是最具可复制性的&#xff0c;希望我的这条路能让你们少走弯路&#xff0c;希望我能帮你们抹去知识的蒙尘&#xff0c;希望我能帮你们理清知识的脉络&#xff0…

javascript将地址转换为经纬度_调用百度地图API_地址解析

1、注册账号和密钥申请 百度地图 选择&#xff0c;开发文档->javascriptAPI 进入后先进行账号和密钥获取&#xff0c;并且申请成为百度开发者&#xff0c;点击使用方法内的步骤即可跳转 获取服务密钥ak 应用类型选择浏览器端 白名单自己设置&#xff1a;只有白名单中的网站…

【博客595】从linux收包与发包过程看iptables链如何发挥作用

从linux收包与发包过程看iptables链如何发挥作用 1、linux收包过程&#xff08;以udp包为例&#xff09; 2、linux发包过程&#xff08;以udp包为例&#xff09; 3、收发包过程中iptables的hook如何发挥作用 主要分为三个部分&#xff1a; 接收数据的处理流程是&#xff1a;…

【Kubernetes 企业项目实战】04、基于 K8s 构建 EFK+logstash+kafka 日志平台(中)

目录 一、安装存储日志组件 Elasticsearch 1.1 创建名称空间 1.2 安装 elasticsearch 组件 1&#xff09;创建 headless service 服务 2&#xff09;通过 statefulset 创建 elasticsearch 集群 二、安装 kibana 可视化 UI 界面 一、安装存储日志组件 Elasticsearch 1.1 …

django框架全解

目录简介MVC与MTV模型MVCMTV创建项目目录生命周期静态文件配置&#xff08;无用&#xff09;启动django路由分组无名分组有名分组路由分发反向解析反向解析结合分组名称空间re_path与path自定义转换器视图HttpRequest常用方法HttpResponseJsonFBV和CBV模板&#xff08;前后端分…

SpringBoot+VUE前后端分离项目学习笔记 - 【24 服务器安装部署】

本节主要实现服务器购买以及服务部署 PS: 由于没有服务器本节仅做大致流程记录&#xff0c;无实际操作步骤 服务器配置安装 文档&#xff1a;docker安装centos、jdk、mysql、redis… 链接&#xff1a;http://note.youdao.com/noteshare?id6a01550a3acfbafc7cbbea4ae99c0e48&a…

嵌入式Linux-进程间通信

1.进程间通信 1.1 进程间通信的介绍 进程间通信&#xff08;interprocess communication&#xff0c;简称 IPC&#xff09;指两个进程之间的通信。系统中的每一个进程都有各自的地址空间&#xff0c;并且相互独立、隔离&#xff0c;每个进程都处于自己的地址空间中。所以同一…

【自学Docker】Docker attach命令

Docker attach命令 大纲 docker attach教程 使用 docker attach 命令可以用来进入到一个正在运行的 Docker容器。docker attach 命令后面的 CONTAINER 可以是容器Id&#xff0c;或者是容器名。 要能够使用 docker attach 的容器必须是正在运行的。 docker attach语法 haic…

# Itext Pdf 5 教程

Itext Pdf 5 教程 Itext Pdf Itext7收费&#xff0c;故使用Itext5传统版&#xff0c;Itext5不再维护 官网&#xff1a;iText 5 |iText PDF Itext5 Java Api 地址&#xff1a;iText 5 Java | iText PDF 依赖 <!-- itextpdf --> <dependency><groupId>c…

视频编辑:VisioForge Video Edit SDK .Net 15.5 标准版 Crack

视频编辑&#xff1a;VisioForge Video Edit SDK .Net v15.5 标准版 Crack,我没有专业版&#xff0c;希望你提供 VisioForge Video Edit SDK .Net 允许程序员轻松地将视频编辑和处理功能集成到他们的软件应用程序中。SDK 允许您使用任何音频和视频文件创建您的电影。您可以为其…

2022年游戏安全风险增长96%,高维作弊对抗激烈

导读&#xff1a;2022年&#xff0c;游戏行业在多种因素影响下遭遇寒冬。但游戏黑灰产规模在迅速壮大&#xff0c;不少游戏饱受其侵扰&#xff0c;越来越多的游戏厂商开始重视游戏安全问题。 为帮助游戏厂商能够清晰、直观地了解当前游戏安全对抗形势&#xff0c;在经过多轮调…