【实时数仓】在Hbase建立维度表、保存维度数据到Hbase、保存业务数据到kafka主题

news2025/1/20 19:15:14

文章目录

  • 一 分流Sink之建立维度表到HBase(Phoenix)
    • 1 拼接建表语句
      • (1)定义配置常量类
      • (2)引入依赖
      • (3)hbase-site.xml
      • (4)在phoenix中执行
      • (5)增加代码
        • a TableProcessFunction
        • b checkTable
      • (6)测试
    • 2 过滤字段
      • (1)代码编写
      • (2)测试
      • (3)总结
  • 二 分流Sink之保存维度数据到HBase(Phoenix)
    • 1 程序执行流程
    • 2 DimSink
    • 3 BaseDBApp
    • 4 测试
  • 三 分流Sink之保存业务数据到Kafka主题
    • 1 BaseDBApp
    • 2 MyKafkaUtil
    • 3 测试
    • 4 总结
  • 四 总结
  • 五 附录:完整代码
    • 0 BaseDBApp
    • 1 MyKafkaUtil
    • 2 GmallConfig
    • 3 TableProcess
    • 4 MyDeserializationSchemaFunction
    • 5 TableProcessFunction
    • 6 DimSink

一 分流Sink之建立维度表到HBase(Phoenix)

1 拼接建表语句

如果读取到的配置信息是维度数据,提前在hbase中通过Phoenix创建维度表。

(1)定义配置常量类

定义一个项目中常用的配置常量类GmallConfig。

package com.hzy.gmall.realtime.common;

/**
 * 实时数仓中的常量类
 */
public class GmallConfig {
    public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}

(2)引入依赖

<!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,可以很方便的对bean对象的属性进行操作-->
<dependency>
    <groupId>commons-beanutils</groupId>
    <artifactId>commons-beanutils</artifactId>
    <version>1.9.3</version>
</dependency>

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-spark</artifactId>
    <version>5.0.0-HBase-2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.el</artifactId>
        </exclusion>
    </exclusions>
</dependency>

(3)hbase-site.xml

因为要用单独的schema,所以在Idea程序中加入hbase-site.xml。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop101:8020/hbase</value>
    </property>

    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop101,hadoop102,hadoop103</value>
    </property>

    <property>
        <name>hbase.unsafe.stream.capability.enforce</name>
        <value>false</value>
    </property>

    <property>
        <name>hbase.wal.provider</name>
        <value>filesystem</value>
    </property>

    <property>
        <name>phoenix.schema.isNamespaceMappingEnabled</name>
        <value>true</value>
    </property>

    <property>
        <name>phoenix.schema.mapSystemTablesToNamespace</name>
        <value>true</value>
    </property>

</configuration>

注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加上以上后两个配置,并使用xsync进行同步

/opt/module/hbase-2.0.5/conf  注意分发
/opt/module/phoenix-5.0.0/bin

重启Hbase服务

(4)在phoenix中执行

create schema GMALL2022_REALTIME;
# 在hbase查看是否创建成功
cd /opt/module/hbase-2.0.5/bin/
hbase shell
list_namespace

(5)增加代码

a TableProcessFunction

// 声明连接对象
private Connection conn;

@Override
public void open(Configuration parameters) throws Exception {
    // 注册驱动
    Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
    // 获取连接
    conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
// 如果读取到的配置信息是维度数据,提前创建维度表
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){
    checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);
}

// 将配置信息放到状态中
// 拼接key
String key = sourceTable + ":" + operateType;
broadcastState.put(key,tableProcess);

b checkTable

// 在处理配置数据时,提前建立维度表
// create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)
private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {
    // 对主键进行空值处理
    if (pk == null){
        pk = "id";
    }
    // 对建表扩展进行空值处理
    if (ext == null){
        ext = "";
    }
    StringBuilder createSql = new StringBuilder("create table if not exists "+
            GmallConfig.HBASE_SCHEMA + "." + tableName +"(");

    String[] fieldArr = fields.split(",");
    for (int i = 0; i < fieldArr.length; i++) {
        String field = fieldArr[i];
        // 判断是否为主键
        if (field.equals(pk)){
            createSql.append(field + " varchar primary key ");
        }else {
            createSql.append(field + " varchar ");
        }
        if(i < fieldArr.length - 1){
            createSql.append(",");
        }
    }
    createSql.append(")" + ext);
    System.out.println("phoenix中的建表语句:" + createSql);


    // 创建数据库操作对象
    PreparedStatement ps = null;
    try {
        ps = conn.prepareStatement(createSql.toString());
        // 执行sql语句
        ps.execute();
    } catch (SQLException e) {
        e.printStackTrace();
        throw new RuntimeException("在phoenix中建表失败");
    } finally {
        // 释放资源
        if(ps != null){
            ps.close();
        }
    }
}

(6)测试

启动hadoop(等待安全模式关闭再进行下一步),zookeeper,kafka,hbase,phoenix,maxwell,在phoenix中查看表数据
!tables
select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;

2 过滤字段

数据在向下游传递之前,过滤掉不需要的字段,只保留配置表中sink_columns存在的字段。

(1)代码编写

if (tableProcess != null){
    // 在配置表中找到了该操作对应的配置
    // 判断是事实数据还是维度数据
    String sinkTable = tableProcess.getSinkTable();
    jsonObj.put("sink_table",sinkTable);

    // 在向下游传递数据之前,将不需要的字段过滤掉
    // 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤
    JSONObject dataJsonObj = jsonObj.getJSONObject("data");
    filterColumns(dataJsonObj,tableProcess.getSinkColumns());

    String sinkType = tableProcess.getSinkType();
    if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
        // 维度数据,放到维度侧输出流中
        ctx.output(dimTag,jsonObj);
    }else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
        // 事实数据,放到主流中
        out.collect(jsonObj);
    }
}else {
    // 在配置表中没有该操作对应的配置
    System.out.println("No This Key In TableProcess:" + key);
}
// 过滤字段
    private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {
        // dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}
        // sinkColumns : id,tm_name
        String[] columnArr = sinkColumns.split(",");
        // 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法
        List<String> columnList = Arrays.asList(columnArr);

        // 获取json中的每一个名值对(KV)
        Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
//        // 获取迭代器
//        Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
//        // 遍历,如果不包含则删除
//        for (;it.hasNext();) {
//            if(!columnList.contains(it.next().getKey())){
//                it.remove();
//            }
//        }
        entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));
    }

(2)测试

开启相关环境,在表中添加或者删除数据,查看输出结果。

(3)总结

在这里插入图片描述

动态分流总结:

  • 广播流数据处理,FlinkCDC从MySQL中读取配置信息
    • 获取状态
    • 读取配置**(维度表的创建)**
    • 将配置封装为Map<sourceTable:operateType,TableProcess>放到状态中。
  • 主流数据处理,maxwell从业务数据库中采集到的数据
    • 获取状态
    • 从状态中获取当前处理数据的配置信息(字段过滤)
    • 根据配置信息进行分流(事实与维度)
  • 维度表的创建
    • 拼接建表语句
    • 通过jdbc执行建表语句
    • 测试
      • schema和namespace的映射
      • 拼接sql,空格处理
      • 提前创建schema
  • 字段过滤
    • 获取保留字段,放到List集合中
    • 获取dataJsonObj,转换为EntrySet
    • 根据保留字段判断EntrySet遍历出来的entry,是否保留

二 分流Sink之保存维度数据到HBase(Phoenix)

1 程序执行流程

在这里插入图片描述

DimSink 继承了RichSinkFunction,这个function得分两条时间线。

  • 一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行。
  • 另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。

2 DimSink

package com.hzy.gmall.realtime.app.fun;
/**
 * 将维度侧输出流的数据写到hbase(Phoenix)中
 */
public class DimSink extends RichSinkFunction<JSONObject> {
    private Connection conn;


    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    @Override
    public void invoke(JSONObject jsonObj, Context context) throws Exception {
        // 上游传递过来的数据格式如下:
        // {"database":"gmall2022",
        // "data":{"tm_name":"a","id":13},
        // "commit":true,
        // "sink_table":"dim_base_trademark",
        // "type":"insert",
        // "table":"base_trademark","
        // ts":1670131087}

        // 获取维度表表名
        String tableName = jsonObj.getString("sink_table");
        // 获取数据
        JSONObject dataJsonObj = jsonObj.getJSONObject("data");
        // 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);
        String upsertSql = genUpsertSql(tableName,dataJsonObj);

        System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);

        PreparedStatement ps = null;
        try {
            // 创建数据库操作对象
            ps = conn.prepareStatement(upsertSql);
            // 执行sql语句
            ps.executeUpdate();
            // 手动提交事务,phoenix的连接实现类不是自动提交事务
            conn.commit();
        }catch (SQLException e){
            e.printStackTrace();
            throw new RuntimeException("向phoenix维度表中插入数据失败了");
        } finally {
            // 释放资源
            if (ps != null){
                ps.close();
            }
        }
    }

    // 拼接插入语句
    private String genUpsertSql(String tableName, JSONObject dataJsonObj) {
        // id 10
        // tm_name zs
        String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +
                " ("+ StringUtils.join(dataJsonObj.keySet(),",") +
                ") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";
        return upsertSql;
    }
}

3 BaseDBApp

//TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中
dimDS.addSink(new DimSink());

4 测试

开启必要环境,向base_tradmark表中添加一条数据,查看phoenix是否插入成功。

三 分流Sink之保存业务数据到Kafka主题

1 BaseDBApp

//TODO 9 将主流数据写回kafka的dwd层
realDS.addSink(
        MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
                return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),
                        jsonObj.getJSONObject("data").toJSONString().getBytes());
            }
        })
);

2 MyKafkaUtil

package com.hzy.gmall.realtime.utils;
// 获取kafka的生产者
// 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
//    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
//        return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
//    }
    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
        return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
                return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());
            }
        },props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    // 获取kafka的生产者
    public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

3 测试

在base_trademark表中添加一条数据,查看程序输出结果,在phoenix中查看结果。

维度数据::3> {"database":"gmall2022","xid":24993,"data":{"tm_name":"c","id":14},"commit":true,"sink_table":"dim_base_trademark","type":"insert","table":"base_trademark","ts":1670150491}
向phoenix维度表中插入数据的sql:upsert into GMALL2022_REALTIME.dim_base_trademark (tm_name,id) values('c','14')

select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;

启动kafka消费者,kfkcon.sh dwd_order_info,在order_info中修改一条数据,查看程序输出结果,在kafka中查看结果。

事实数据::3> {"database":"gmall2022","xid":25144,"data":{"delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00},"old":{"consignee":"苗冰"},"commit":true,"sink_table":"dwd_order_info","type":"update","table":"order_info","ts":1670150618}
kafka中输出内容
{"delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00}

4 总结

动态分流测试执行流程

  • 需要启动的进程:zookeeper,kafka,maxwell,hdfs,hbase,BaseDBApp

  • 当业务数据发生变化,maxwell会采集变化数据到ods层

  • BaseDBApp从ods层读取到变化数据,作为业务主流

  • BaseDBApp在启动时,会通过FlinkCDC读取配置表,作为广播流

  • 业务流和广播流通过connect进行连接

  • 对连接之后的数据通过process进行处理

    • processElement
    • processBroadcastElement

    具体执行流程见一 2(3)总结

  • 将维度侧输出流的数据写到Hbase中 – DimSink

    • 拼接upsert
    • 执行sql(手动提交事务)
  • 将主流数据写回到kafka的dwd层

    • 重写获取FlinkKafkaProducer的方法,自定义序列化的过程
    • 将主流的数据写到kafka不同的主题中,并且保存精准一次性

四 总结

DWD的实时计算核心就是数据分流,其次是状态识别。在开发过程中使用了几个灵活度较强算子,比如RichMapFunction, ProcessFunction, RichSinkFunction。 那这几个算子何时会用到,如何进行选择,汇总见下表:

Function可转换结构可过滤数据侧输出open****方法可以使用状态输出至
MapFunctionYesNoNoNoNo下游算子
FilterFunctionNoYesNoNoNo下游算子
RichMapFunctionYesNoNoYesYes下游算子
RichFilterFunctionNoYesNoYesYes下游算子
ProcessFunctionYesYesYesYesYes下游算子
SinkFunctionYesYesNoNoNo外部
RichSinkFunctionYesYesNoYesYes外部

从对比表中能明显看出,Rich系列能功能强大,ProcessFunction功能更强大,但是相对的越全面的算子使用起来也更加繁琐。

五 附录:完整代码

0 BaseDBApp

package com.hzy.gmall.realtime.app.dwd;

public class BaseDBApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本环境准备
        //流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(4);

//        //TODO 2 检查点设置
//        //开启检查点
//        env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
//        // 设置检查点超时时间
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);
//        // 设置重启策略
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
//        // 设置job取消后,检查点是否保留
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
//        // 指定操作HDFS的用户
//        System.setProperty("HADOOP_USER_NAME","hzy");

        //TODO 3 从kafka中读取数据
        //声明消费的主题以及消费者组
        String topic = "ods_base_db_m";
        String groupId = "base_db_app_group";
        // 获取消费者对象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        // 读取数据,封装成流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4 对数据类型进行转换 String -> JSONObject
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 5 简单的ETL
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
                new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonobj) throws Exception {
                        boolean flag =
                                jsonobj.getString("table") != null &&
                                        jsonobj.getString("table").length() > 0 &&
                                        jsonobj.getJSONObject("data") != null &&
                                        jsonobj.getString("data").length() > 3;
                        return flag;
                    }
                }
        );
//        filterDS.print("<<<");

        //TODO 6 使用FlinkCDC读取配置表数据
        //获取dataSource
        Properties props = new Properties();
        props.setProperty("scan.startup.mode","initial");
        SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop101")
                .port(3306)
                .username("root")
                .password("123456")
                // 可配置多个库
                .databaseList("gmall2022_realtime")
                ///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
                //注意:指定的时候需要使用"db.table"的方式
                .tableList("gmall2022_realtime.table_process")
                .debeziumProperties(props)
                .deserializer(new MyDeserializationSchemaFunction())
                .build();

        // 读取数据封装流
        DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);

        // 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去
        // 想要使用广播状态,状态描述器只能是map,使用map状态存储
        MapStateDescriptor<String, TableProcess> mapStateDescriptor =
                new MapStateDescriptor<>("table_process", String.class, TableProcess.class);

        BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);

        // 调用非广播流的connect方法,将业务流与配置流进行连接
        BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);

        //TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中
        //声明维度侧输出流的标记
        OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {};
        SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
                new TableProcessFunction(dimTag,mapStateDescriptor)
        );
        // 获取维度侧输出流
        DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);
        realDS.print("事实数据:");
        dimDS.print("维度数据:");

        //TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中
        dimDS.addSink(new DimSink());

        //TODO 9 将主流数据写回kafka的dwd层
        realDS.addSink(
                MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
                        return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),
                                jsonObj.getJSONObject("data").toJSONString().getBytes());
                    }
                })
        );

        env.execute();
    }
}

1 MyKafkaUtil

package com.hzy.gmall.realtime.utils;
/**
 * 操作kafka工具类
 */
public class MyKafkaUtil {
    private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
    private static final String DEFAULT_TOPIC = "default_topic";

    // 获取kafka的消费者
    public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        // 定义消费者组
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),props);
    }

    // 获取kafka的生产者
    // 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
//    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
//        return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
//    }
    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
        return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
                return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());
            }
        },props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    // 获取kafka的生产者
    public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }
}

2 GmallConfig

package com.hzy.gmall.realtime.common;

/**
 * 实时数仓中的常量类
 */
public class GmallConfig {
    public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}

3 TableProcess

package com.hzy.gmall.realtime.beans;

import lombok.Data;
@Data
public class TableProcess {
    //动态分流Sink常量   改为小写和脚本一致
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //来源表
    String sourceTable;
    //操作类型 insert,update,delete
    String operateType;
    //输出类型 hbase kafka
    String sinkType;
    //输出表(主题)
    String sinkTable;
    //输出字段
    String sinkColumns;
    //主键字段
    String sinkPk;
    //建表扩展
    String sinkExtend;
}

4 MyDeserializationSchemaFunction

package com.hzy.gmall.realtime.app.fun;

public class MyDeserializationSchemaFunction  implements DebeziumDeserializationSchema<String> {
    // 反序列化
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        // 导入的是org.apache.kafka.connnect.data包
        Struct valueStruct = (Struct) sourceRecord.value();
        // 获取数据的来源
        Struct afterStruct = valueStruct.getStruct("after");
        // 获取数据库和表名的来源
        Struct sourceStruct = valueStruct.getStruct("source");
        // 获取数据库
        String database = sourceStruct.getString("db");
        // 获取表名
        String table = sourceStruct.getString("table");
        // 获取操作类型
//        String op = valueStruct.getString("op");
        String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
        if(type.equals("create")){
            type = "insert";
        }
        JSONObject jsonObj = new JSONObject();
        jsonObj.put("database",database);
        jsonObj.put("table",table);
        jsonObj.put("type",type);
        // 获取影响的数据
        // 删除时,afterStruct为空
        JSONObject dataJsonObj = new JSONObject();
        if (afterStruct != null){
            // schema获取源数据的格式,fields获取里面的各个元素
            for (Field field : afterStruct.schema().fields()) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(field);
                dataJsonObj.put(fieldName,fieldValue);
            }
        }
        // 删除操作会使得data属性不为空,但size为0
        jsonObj.put("data",dataJsonObj);

        // 向下游发送数据
        collector.collect(jsonObj.toJSONString()) ;
    }

    // 指定类型
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

5 TableProcessFunction

package com.hzy.gmall.realtime.app.fun;

/**
 * 实现动态分流
 * 目前流中有两条流中的数据,使用以下两个方法分别进行处理
 */
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
    // 声明维度侧输出流标签
    private OutputTag<JSONObject> dimTag;
    // 声明广播状态描述器
    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    // 声明连接对象
    private Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册驱动
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        // 获取连接
        conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.dimTag = dimTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    // 处理业务流中的数据,maxwell从业务数据库中采集到的数据
    @Override
    public void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        String table = jsonObj.getString("table");
        String type = jsonObj.getString("type");

        // 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insert
        if (type.equals("bootstrap-insert")){
            type = "insert";
            jsonObj.put("type",type);
        }

        // 拼接key
        String key = table  + ":" + type;
        // 获取状态
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        // 从状态中获取配置信息
        TableProcess tableProcess = broadcastState.get(key);

        if (tableProcess != null){
            // 在配置表中找到了该操作对应的配置
            // 判断是事实数据还是维度数据
            String sinkTable = tableProcess.getSinkTable();
            jsonObj.put("sink_table",sinkTable);

            // 在向下游传递数据之前,将不需要的字段过滤掉
            // 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤
            JSONObject dataJsonObj = jsonObj.getJSONObject("data");
            filterColumns(dataJsonObj,tableProcess.getSinkColumns());

            String sinkType = tableProcess.getSinkType();
            if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
                // 维度数据,放到维度侧输出流中
                ctx.output(dimTag,jsonObj);
            }else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
                // 事实数据,放到主流中
                out.collect(jsonObj);
            }
        }else {
            // 在配置表中没有该操作对应的配置
            System.out.println("No This Key In TableProcess:" + key);
        }
    }

    // 过滤字段
    private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {
        // dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}
        // sinkColumns : id,tm_name
        String[] columnArr = sinkColumns.split(",");
        // 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法
        List<String> columnList = Arrays.asList(columnArr);

        // 获取json中的每一个名值对(KV)
        Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
//        // 获取迭代器
//        Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
//        // 遍历,如果不包含则删除
//        for (;it.hasNext();) {
//            if(!columnList.contains(it.next().getKey())){
//                it.remove();
//            }
//        }
        entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));
    }

    // 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息
    @Override
    public void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
        // 获取状态
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        // 将json格式字符串转换为JSON对象
        JSONObject jsonObj = JSONObject.parseObject(jsonStr);
        // 获取配置表中的一条配置信息
        // parseObject:将json格式字符串转化为json格式对象
        // 第二个参数为将json字符串转化为何种格式的对象
        TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);
        // 业务数据库表名
        String sourceTable = tableProcess.getSourceTable();
        // 操作类型
        String operateType = tableProcess.getOperateType();
        // 数据类型 hbase -- 维度数据    kafka -- 事实数据
        String sinkType = tableProcess.getSinkType();
        // 指定输出目的地
        String sinkTable = tableProcess.getSinkTable();
        // 主键
        String sinkPk = tableProcess.getSinkPk();
        // 指定保留字段(列)
        String sinkColumns = tableProcess.getSinkColumns();
        // 指定建表扩展语句
        String sinkExtend = tableProcess.getSinkExtend();

        // 如果读取到的配置信息是维度数据,提前创建维度表
        if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){
            checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);
        }

        // 将配置信息放到状态中
        // 拼接key
        String key = sourceTable + ":" + operateType;
        broadcastState.put(key,tableProcess);
    }

    // 在处理配置数据时,提前建立维度表
    // create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)
    private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {
        // 对主键进行空值处理
        if (pk == null){
            pk = "id";
        }
        // 对建表扩展进行空值处理
        if (ext == null){
            ext = "";
        }
        StringBuilder createSql = new StringBuilder("create table if not exists "+
                GmallConfig.HBASE_SCHEMA + "." + tableName +"(");

        String[] fieldArr = fields.split(",");
        for (int i = 0; i < fieldArr.length; i++) {
            String field = fieldArr[i];
            // 判断是否为主键
            if (field.equals(pk)){
                createSql.append(field + " varchar primary key ");
            }else {
                createSql.append(field + " varchar ");
            }
            if(i < fieldArr.length - 1){
                createSql.append(",");
            }
        }
        createSql.append(")" + ext);
        System.out.println("phoenix中的建表语句:" + createSql);


        // 创建数据库操作对象
        PreparedStatement ps = null;
        try {
            ps = conn.prepareStatement(createSql.toString());
            // 执行sql语句
            ps.execute();
        } catch (SQLException e) {
            e.printStackTrace();
            throw new RuntimeException("在phoenix中建表失败");
        } finally {
            // 释放资源
            if(ps != null){
                ps.close();
            }
        }
    }
}

6 DimSink

package com.hzy.gmall.realtime.app.fun;

/**
 * 将维度侧输出流的数据写到hbase(Phoenix)中
 */
public class DimSink extends RichSinkFunction<JSONObject> {
    private Connection conn;


    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    @Override
    public void invoke(JSONObject jsonObj, Context context) throws Exception {
        // 上游传递过来的数据格式如下:
        // {"database":"gmall2022",
        // "data":{"tm_name":"a","id":13},
        // "commit":true,
        // "sink_table":"dim_base_trademark",
        // "type":"insert",
        // "table":"base_trademark","
        // ts":1670131087}

        // 获取维度表表名
        String tableName = jsonObj.getString("sink_table");
        // 获取数据
        JSONObject dataJsonObj = jsonObj.getJSONObject("data");
        // 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);
        String upsertSql = genUpsertSql(tableName,dataJsonObj);

        System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);

        PreparedStatement ps = null;
        try {
            // 创建数据库操作对象
            ps = conn.prepareStatement(upsertSql);
            // 执行sql语句
            ps.executeUpdate();
            // 手动提交事务,phoenix的连接实现类不是自动提交事务
            conn.commit();
        }catch (SQLException e){
            e.printStackTrace();
            throw new RuntimeException("向phoenix维度表中插入数据失败了");
        } finally {
            // 释放资源
            if (ps != null){
                ps.close();
            }
        }
    }

    // 拼接插入语句
    private String genUpsertSql(String tableName, JSONObject dataJsonObj) {
        // id 10
        // tm_name zs
        String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +
                " ("+ StringUtils.join(dataJsonObj.keySet(),",") +
                ") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";
        return upsertSql;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/92947.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker安装简单命令

一、 Docker是什么? 要了解Docker&#xff0c;首先要了解什么是容器&#xff1f; 容器是一个软件的轻量级独立可执行软件包&#xff0c;包含运行它所需的一切&#xff1a;代码&#xff0c;运行时&#xff0c;系统工具&#xff0c;系统库&#xff0c;设置。不管环境如何&…

C++(第十二篇):多态(虚函数、抽象类、虚函数表、虚表指针、多继承下的多态)

&#x1f4d2;博客主页&#xff1a;Morning_Yang丶 &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f4cc;本文所属专栏&#xff1a; 【C拒绝从入门到跑路】 &#x1f64f;作者水平有限&#xff0c;如果发现错误&#xff0c;敬请指正&…

13485-59-1,二肽Ala-Pro,H2N-AP-OH

Substrate for skin fibroblast prolidase.皮肤成纤维细胞prolida酶的底物。 编号: 199181中文名称: 二肽Ala-Pro英文名: Ala-ProCAS号: 13485-59-1单字母: H2N-AP-OH三字母: H2N-Ala-Pro-COOH氨基酸个数: 2分子式: C8H14N2O3平均分子量: 186.21精确分子量: 186.1等电点(PI): 6…

web前端期末大作业:红色主题中国文化网页设计与实现——基于HTML+CSS实现中国梦(20页)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

电脑技巧:Win11系统新增的磁盘分区功能介绍

很多用户发现&#xff0c;新买的电脑中出厂自带的硬盘只有一个分区&#xff0c;目前只有少部分电脑机型出厂会有分多个分区的。 磁盘的单一分区往往不能满足我们的使用需要&#xff0c;Win7/8/10我们都知道可以在磁盘管理下通过压缩卷的方式来分区&#xff0c;Win11操作系统又新…

C# 程序的错误与调试

一 错误的zhongl 程序的错误通常可以分为三大类 ① 语法错误 ② 运行错误 ③ 逻辑错误 二 语法错误 1 常见的语法错误 ① 如括号不配对&#xff0c;多了或少了分号&#xff1b; ② 字母写错&#xff0c;变量未定义&#xff0c;控件命名写错&#xff1b; ③ 函数少了一个参数…

2022面试官常考的前端面试题

Ajax 它是一种异步通信的方法&#xff0c;通过直接由 js 脚本向服务器发起 http 通信&#xff0c;然后根据服务器返回的数据&#xff0c;更新网页的相应部分&#xff0c;而不用刷新整个页面的一种方法。 面试手写&#xff08;原生&#xff09;&#xff1a; //1&#xff1a;创建…

大一学生《Web编程基础》期末网页制作 HTML+CSS鲜花网页设计实例

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

NLP学习笔记(二) LSTM基本介绍

大家好&#xff0c;我是半虹&#xff0c;这篇文章来讲长短期记忆网络 (Long Short-Term Memory, LSTM) 文章行文思路如下&#xff1a; 首先通过循环神经网络引出为啥需要长短期记忆网络然后介绍长短期记忆网络的核心思想与运作方式最后通过简短的代码深入理解长短期记忆网络的…

Java面试题总结-hashcode和equals

前段时间有朋友问我&#xff1a;“你重写过 hashcode 和 equals 么&#xff0c;为什么重写 equals 时必须重写 hashCode 方法&#xff1f;” 之前的学习中有深入了解过&#xff0c;后来很久没复习了&#xff0c;淡忘许多&#xff0c;回答的时候也有很多地方卡壳&#xff0c;干脆…

【数据结构Java版】Queue队列的活用

目录 一、队列的定义 二、队列的使用 &#xff08;1&#xff09;主要方法 &#xff08;2&#xff09;实例演示 ​&#xff08;3&#xff09;注意事项 三、队列的模拟实现 四、循环队列 &#xff08;1&#xff09;循环队列定义 ​&#xff08;2&#xff09;循环队列的表…

web前端期末大作业:美食文化网页设计与实现——美食餐厅三级(HTML+CSS+JavaScript)

&#x1f468;‍&#x1f393;静态网站的编写主要是用HTML DIVCSS JS等来完成页面的排版设计&#x1f469;‍&#x1f393;,常用的网页设计软件有Dreamweaver、EditPlus、HBuilderX、VScode 、Webstorm、Animate等等&#xff0c;用的最多的还是DW&#xff0c;当然不同软件写出的…

Cambridge IGCSE Mathematics真题讲解1

考试局&#xff1a;Cambridge Assessment International Education (CAIE)考试类别&#xff1a;Cambridge International General Certificate of Secondary Education (IGCSE)考试科目&#xff1a;Mathematics考试单元&#xff1a;Paper 2 (Extended)试卷代码&#xff1a;0580…

全栈Jmeter接口测试(十四):跨线程组传递jmeter变量及cookie的处理

setUp线程组 setUp thread group&#xff1a; 一种特殊类型的线程组&#xff0c;用于在执行常规线程组之前执行一些必要的操作。 在 setup线程组下提到的线程行为与普通线程组完全相同。不同的是执行顺序--- 它会在普通线程组执行之前被触发&#xff1b; 应用场景举例&#xf…

大二Web课程设计:服装网页设计题材——HTML+CSS汉服文化带背景音乐素材带视频(12页)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

SQLAlchemy

一 概述 SQLAlchemy是 SQL工具包和对象关系映射器用于使用 数据库和 Python。它有几个不同的区域 &#xff0c;可单独使用或组合使用。其主要组成部分如下所示&#xff0c; 将组件依赖项组织成层&#xff1a; 上面两个最重要的部分 SQLAlchemy是对象关系映射器&#xff08;OR…

联盟营销是什么?和网红营销有什么区别?

之前讲过一篇关于联盟营销文章相关的&#xff0c;发现大家都很感兴趣&#xff0c;今天东哥就专门写一篇更全面的文章给大家好好介绍一下联盟营销以及它跟网红营销有什么区别吗&#xff1f; 联盟营销是什么&#xff1f; 联盟营销是一种根据营销效果付费的营销模式。商家利用第三…

Flink 运行错误 java.lang.OutOfMemoryError: Direct buffer memory

如遇到如下错误&#xff0c;表示需要调大配置项 taskmanager.memory.framework.off-heap.size 的值&#xff0c;taskmanager.memory.framework.off-heap.size 的默认值为 128MB&#xff0c;错误显示不够用需要调大。 2022-12-16 09:09:21,633 INFO [464321] [org.apache.hadoo…

西门子Siemens EDI需求分析及解决方案

西门子股份公司是一家专注于工业、基础设施、交通和医疗领域的科技公司&#xff0c;始终致力于做到订单、供应以及财务流程的安全、经济、高效&#xff0c;并努力提高自身与交易伙伴之间电子商务的互惠互利。为了提高与交易伙伴之间的数据传输效率&#xff0c;西门子Siemens ED…

1571_AURIX_TC275_ERU寄存器以及锁步控制

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 这些寄存器bits其实是对应了MCU的信号路由设计。 FC其实是flag clear的一个缩写&#xff0c;这样可以明确弄清楚前面文字的描述。对应的&#xff0c;FS&#xff0c;其实是flag set。下面的…