【实时数仓】动态分流的实现源码(反序列化器、配置表、广播流、业务流)

news2024/11/24 16:54:54

文章目录

  • 一 根据MySQL的配置表,动态进行分流
    • 1 自定义反序列化器
      • (1)需求分析
      • (2)代码实现
    • 2 从配置表中读取数据
      • (1)自定义CDC采集的反序列化器
      • (2)使用FlinkCDC读取配置表数据
    • 3 定义广播状态
    • 4 完成动态分流 -- 广播流
      • (1)TableProcessFunction
      • (2)主程序
      • (3)测试
    • 5 完成动态分流 -- 业务流
      • (1)TableProcessFunction
      • (2)测试
    • 6 完整代码
      • (1)主程序
      • (2)TableProcessFunction
      • (3)MyDeserializationSchemaFunction
      • (4)总结

一 根据MySQL的配置表,动态进行分流

动态分流整体思路:

在这里插入图片描述

1 自定义反序列化器

(1)需求分析

使用FLinkCDC API方式获取到的内容如下

// 封装的数据对象类型
SourceRecord{
	sourcePartition={server=mysql_binlog_source}, 
	sourceOffset={file=mysql-bin.000004, pos=1343, row=1, snapshot=true}
} 
// ConnectRecord 是 SourceRecord的父类
ConnectRecord{
	topic='mysql_binlog_source.gmall2022_realtime.t_user', 
	kafkaPartition=null, 
	key=Struct{id=1}, 
	keySchema=Schema{mysql_binlog_source.gmall2022_realtime.t_user.Key:STRUCT},
value=Struct{
after=Struct{id=1,name=zhangsan,age=18},
source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall2022_realtime,table=t_user,server_id=0,file=mysql-bin.000004,pos=1343,row=0},
op=c,ts_ms=1669896172346}, 
valueSchema=Schema{mysql_binlog_source.gmall2022_realtime.t_user.Envelope:STRUCT}, 
timestamp=null, 
headers=ConnectHeaders(headers=)}

自动定义反序列化器,实现将以上信息转化为JSON格式的字符串,包换的属性包括:数据库名称,表名,操作类型,影响的数据内容。

以上内容中有价值的信息如下:

ConnectRecord{
	value=Struct{
		after=Struct{id=1,name=zhangsan,age=18},
		source=Struct{
			db=gmall2022_realtime,
			table=t_user
		},
		op=c
	},
}		

(2)代码实现

/**
 * 通过FlinkCDC动态读取MySQL表中的数据 -- DataStreamAPI
 *
 * 自定义反序列化器
 */
public class FlinkCDC03_CustomerSchema {
    public static void main(String[] args) throws Exception {
        //TODO 1 准备流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 3 创建Flink-MySQL-CDC的Source
        Properties props = new Properties();
        props.setProperty("scan.startup.mode","initial");
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop101")
                .port(3306)
                .username("root")
                .password("123456")
                // 可配置多个库
                .databaseList("gmall2022_realtime")
                ///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
                //注意:指定的时候需要使用"db.table"的方式
                .tableList("gmall2022_realtime.t_user")
                .debeziumProperties(props)
                .deserializer(new MySchema())
                .build();

        //TODO 4 使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlDS = env.addSource(sourceFunction);

        //TODO 5 打印输出
        mysqlDS.print();

        //TODO 6 执行任务
        env.execute();
    }
}

class MySchema implements DebeziumDeserializationSchema<String> {

    /**
     * ConnectRecord{
     *     value=Struct{
     *        after=Struct{id=1,name=zhangsan,age=18},
     *        source=Struct{
     *           db=gmall2022_realtime,
     *           table=t_user
     *                },
     *        op=c
     *     },
     * }
     */
    // 反序列化
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        // 导入的是org.apache.kafka.connnect.data包
        Struct valueStruct = (Struct) sourceRecord.value();
        // 获取数据的来源
        Struct afterStruct = valueStruct.getStruct("after");
        // 获取数据库和表名的来源
        Struct sourceStruct = valueStruct.getStruct("source");
        // 获取数据库
        String database = sourceStruct.getString("db");
        // 获取表名
        String table = sourceStruct.getString("table");
        // 获取操作类型
//        String op = valueStruct.getString("op");
        String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
        if(type.equals("create")){
            type = "insert";
        }
        JSONObject jsonObj = new JSONObject();
        jsonObj.put("database",database);
        jsonObj.put("table",table);
        jsonObj.put("type",type);
        // 获取影响的数据
        // 删除时,afterStruct为空
        JSONObject dataJsonObj = new JSONObject();
        if (afterStruct != null){
            // schema获取源数据的格式,fields获取里面的各个元素
            for (Field field : afterStruct.schema().fields()) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(field);
                dataJsonObj.put(fieldName,fieldValue);
            }
        }
        // 删除操作会使得data属性不为空,但size为0
        jsonObj.put("data",dataJsonObj);

        // 向下游发送数据
        collector.collect(jsonObj.toJSONString()) ;
    }

    // 指定类型
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

2 从配置表中读取数据

(1)自定义CDC采集的反序列化器

将CDC格式数据转换为json字符串

package com.hzy.gmall.realtime.app.fun;

public class MyDeserializationSchemaFunction  implements DebeziumDeserializationSchema<String> {
    // 反序列化
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        // 导入的是org.apache.kafka.connnect.data包
        Struct valueStruct = (Struct) sourceRecord.value();
        // 获取数据的来源
        Struct afterStruct = valueStruct.getStruct("after");
        // 获取数据库和表名的来源
        Struct sourceStruct = valueStruct.getStruct("source");
        // 获取数据库
        String database = sourceStruct.getString("db");
        // 获取表名
        String table = sourceStruct.getString("table");
        // 获取操作类型
//        String op = valueStruct.getString("op");
        String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
        if(type.equals("create")){
            type = "insert";
        }
        JSONObject jsonObj = new JSONObject();
        jsonObj.put("database",database);
        jsonObj.put("table",table);
        jsonObj.put("type",type);
        // 获取影响的数据
        // 删除时,afterStruct为空
        JSONObject dataJsonObj = new JSONObject();
        if (afterStruct != null){
            // schema获取源数据的格式,fields获取里面的各个元素
            for (Field field : afterStruct.schema().fields()) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(field);
                dataJsonObj.put(fieldName,fieldValue);
            }
        }
        // 删除操作会使得data属性不为空,但size为0
        jsonObj.put("data",dataJsonObj);

        // 向下游发送数据
        collector.collect(jsonObj.toJSONString()) ;
    }

    // 指定类型
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

(2)使用FlinkCDC读取配置表数据

//TODO 6 使用FlinkCDC读取配置表数据
//获取dataSource
Properties props = new Properties();
props.setProperty("scan.startup.mode","initial");
SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder()
        .hostname("hadoop101")
        .port(3306)
        .username("root")
        .password("123456")
        // 可配置多个库
        .databaseList("gmall2022_realtime")
        ///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
        //注意:指定的时候需要使用"db.table"的方式
        .tableList("gmall2022_realtime.table_process")
        .debeziumProperties(props)
        .deserializer(new MyDeserializationSchemaFunction())
        .build();

// 读取数据封装流
DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);

程序中目前存在两条流:

  • 数据主流
  • 配置表流

3 定义广播状态

Flink广播状态。

为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去

// 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去
// 想要使用广播状态,状态描述器只能是map
MapStateDescriptor<String, TableProcess> mapStateDescriptor =
        new MapStateDescriptor<>("table_process", String.class, TableProcess.class);

BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);

// 调用非广播流的connect方法,将业务流与配置流进行连接
BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);

4 完成动态分流 – 广播流

处理广播流中的数据,广播流对状态有写的权限。

(1)TableProcessFunction

package com.hzy.gmall.realtime.app.fun;

/**
 * 实现动态分流
 * 目前流中有两条流中的数据,使用以下两个方法分别进行处理
 */
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
    // 声明维度侧输出流标签
    private OutputTag<JSONObject> dimTag;
    // 声明广播状态描述器
    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;

    public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.dimTag = dimTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    // 处理业务流中的数据,maxwell从业务数据库中采集到的数据
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {

    }

    // 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息
    @Override
    public void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
        // 获取状态
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        // 将json格式字符串转换为JSON对象
        JSONObject jsonObj = JSONObject.parseObject(jsonStr);
        // 获取配置表中的一条配置信息
        // parseObject:将json格式字符串转化为json格式对象
        // 第二个参数为将json字符串转化为何种格式的对象
        TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);
        // 业务数据库表名
        String sourceTable = tableProcess.getSourceTable();
        // 操作类型
        String operateType = tableProcess.getOperateType();
        // 数据类型 hbase -- 维度数据    kafka -- 事实数据
        String sinkType = tableProcess.getSinkType();
        // 指定输出目的地
        String sinkTable = tableProcess.getSinkTable();
        // 主键
        String sinkPk = tableProcess.getSinkPk();
        // 指定保留字段(列)
        String sinkColumns = tableProcess.getSinkColumns();
        // 指定建表扩展语句
        String sinkExtend = tableProcess.getSinkExtend();

        // 将配置信息放到状态中
        // 拼接key
        String key = sourceTable + ":" + operateType;
        broadcastState.put(key,tableProcess);
    }
}

(2)主程序

//TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中
//声明维度侧输出流的标记
OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {};
SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
        new TableProcessFunction(dimTag,mapStateDescriptor)
);
// 获取维度侧输出流
DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);
realDS.print("事实数据:");
dimDS.print("维度数据:");

(3)测试

向table_process中添加几条数据

source_table    operate_type  sink_type  sink_table          sink_columns   sink_pk  sink_extend  
--------------  ------------  ---------  ------------------  --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  -------  -------------
base_trademark  insert        hbase      dim_base_trademark  id,tm_name     id       (NULL)       
order_info      update        kafka      dwd_order_info      id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time  id       (NULL)       
# 启动zookeeper、kafka、maxwell

读取配置表数据,反序列化,拼接成JSON,json格式如下:
在这里插入图片描述

将配置信息封装成tableProcess对象,如下图:

在这里插入图片描述

5 完成动态分流 – 业务流

(1)TableProcessFunction

处理业务流中的数据,业务流只能从状态中获取数据,无写的权限。

// 处理业务流中的数据,maxwell从业务数据库中采集到的数据
@Override
public void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
    String table = jsonObj.getString("table");
    String type = jsonObj.getString("type");

    // 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insert
    if (type.equals("bootstrap-insert")){
        type = "insert";
        jsonObj.put("type",type);
    }

    // 拼接key
    String key = table  + ":" + type;
    // 获取状态
    ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
    // 从状态中获取配置信息
    TableProcess tableProcess = broadcastState.get(key);

    if (tableProcess != null){
        // 在配置表中找到了该操作对应的配置
        // 判断是事实数据还是维度数据
        String sinkTable = tableProcess.getSinkTable();
        jsonObj.put("sink_table",sinkTable);

        String sinkType = tableProcess.getSinkType();
        if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
            // 维度数据,放到维度侧输出流中
            ctx.output(dimTag,jsonObj);
        }else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
            // 事实数据,放到主流中
            out.collect(jsonObj);
        }
    }else {
        // 在配置表中没有该操作对应的配置
        System.out.println("No This Key In TableProcess:" + key);
    }
}

(2)测试

在base_trademark中添加一条数据,观察输出结果。

在order_info中修改一条数据,观察输出结果。

在这里插入图片描述

6 完整代码

(1)主程序

public class BaseDBApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本环境准备
        //流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(4);

//        //TODO 2 检查点设置
//        //开启检查点
//        env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
//        // 设置检查点超时时间
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);
//        // 设置重启策略
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
//        // 设置job取消后,检查点是否保留
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
//        // 指定操作HDFS的用户
//        System.setProperty("HADOOP_USER_NAME","hzy");

        //TODO 3 从kafka中读取数据
        //声明消费的主题以及消费者组
        String topic = "ods_base_db_m";
        String groupId = "base_db_app_group";
        // 获取消费者对象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        // 读取数据,封装成流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4 对数据类型进行转换 String -> JSONObject
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 5 简单的ETL
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
                new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonobj) throws Exception {
                        boolean flag =
                                jsonobj.getString("table") != null &&
                                        jsonobj.getString("table").length() > 0 &&
                                        jsonobj.getJSONObject("data") != null &&
                                        jsonobj.getString("data").length() > 3;
                        return flag;
                    }
                }
        );
//        filterDS.print("<<<");

        //TODO 6 使用FlinkCDC读取配置表数据
        //获取dataSource
        Properties props = new Properties();
        props.setProperty("scan.startup.mode","initial");
        SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop101")
                .port(3306)
                .username("root")
                .password("123456")
                // 可配置多个库
                .databaseList("gmall2022_realtime")
                ///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
                //注意:指定的时候需要使用"db.table"的方式
                .tableList("gmall2022_realtime.table_process")
                .debeziumProperties(props)
                .deserializer(new MyDeserializationSchemaFunction())
                .build();

        // 读取数据封装流
        DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);

        // 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去
        // 想要使用广播状态,状态描述器只能是map,使用map状态存储
        MapStateDescriptor<String, TableProcess> mapStateDescriptor =
                new MapStateDescriptor<>("table_process", String.class, TableProcess.class);

        BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);

        // 调用非广播流的connect方法,将业务流与配置流进行连接
        BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);

        //TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中
        //声明维度侧输出流的标记
        OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {};
        SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
                new TableProcessFunction(dimTag,mapStateDescriptor)
        );
        // 获取维度侧输出流
        DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);
        realDS.print("事实数据:");
        dimDS.print("维度数据:");

        //TODO 8 将维度侧输出流的数据写到Hbase中

        //TODO 9 将主流数据写回kafka的dwd层

        env.execute();
    }
}

(2)TableProcessFunction

/**
 * 实现动态分流
 * 目前流中有两条流中的数据,使用以下两个方法分别进行处理
 */
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
    // 声明维度侧输出流标签
    private OutputTag<JSONObject> dimTag;
    // 声明广播状态描述器
    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;

    public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.dimTag = dimTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    // 处理业务流中的数据,maxwell从业务数据库中采集到的数据
    @Override
    public void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        String table = jsonObj.getString("table");
        String type = jsonObj.getString("type");

        // 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insert
        if (type.equals("bootstrap-insert")){
            type = "insert";
            jsonObj.put("type",type);
        }

        // 拼接key
        String key = table  + ":" + type;
        // 获取状态
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        // 从状态中获取配置信息
        TableProcess tableProcess = broadcastState.get(key);

        if (tableProcess != null){
            // 在配置表中找到了该操作对应的配置
            // 判断是事实数据还是维度数据
            String sinkTable = tableProcess.getSinkTable();
            jsonObj.put("sink_table",sinkTable);

            String sinkType = tableProcess.getSinkType();
            if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
                // 维度数据,放到维度侧输出流中
                ctx.output(dimTag,jsonObj);
            }else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
                // 事实数据,放到主流中
                out.collect(jsonObj);
            }
        }else {
            // 在配置表中没有该操作对应的配置
            System.out.println("No This Key In TableProcess:" + key);
        }
    }

    // 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息
    @Override
    public void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
        // 获取状态
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        // 将json格式字符串转换为JSON对象
        JSONObject jsonObj = JSONObject.parseObject(jsonStr);
        // 获取配置表中的一条配置信息
        // parseObject:将json格式字符串转化为json格式对象
        // 第二个参数为将json字符串转化为何种格式的对象
        TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);
        // 业务数据库表名
        String sourceTable = tableProcess.getSourceTable();
        // 操作类型
        String operateType = tableProcess.getOperateType();
        // 数据类型 hbase -- 维度数据    kafka -- 事实数据
        String sinkType = tableProcess.getSinkType();
        // 指定输出目的地
        String sinkTable = tableProcess.getSinkTable();
        // 主键
        String sinkPk = tableProcess.getSinkPk();
        // 指定保留字段(列)
        String sinkColumns = tableProcess.getSinkColumns();
        // 指定建表扩展语句
        String sinkExtend = tableProcess.getSinkExtend();

        // 将配置信息放到状态中
        // 拼接key
        String key = sourceTable + ":" + operateType;
        broadcastState.put(key,tableProcess);
    }
}

(3)MyDeserializationSchemaFunction

public class MyDeserializationSchemaFunction  implements DebeziumDeserializationSchema<String> {
    // 反序列化
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        // 导入的是org.apache.kafka.connnect.data包
        Struct valueStruct = (Struct) sourceRecord.value();
        // 获取数据的来源
        Struct afterStruct = valueStruct.getStruct("after");
        // 获取数据库和表名的来源
        Struct sourceStruct = valueStruct.getStruct("source");
        // 获取数据库
        String database = sourceStruct.getString("db");
        // 获取表名
        String table = sourceStruct.getString("table");
        // 获取操作类型
//        String op = valueStruct.getString("op");
        String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
        if(type.equals("create")){
            type = "insert";
        }
        JSONObject jsonObj = new JSONObject();
        jsonObj.put("database",database);
        jsonObj.put("table",table);
        jsonObj.put("type",type);
        // 获取影响的数据
        // 删除时,afterStruct为空
        JSONObject dataJsonObj = new JSONObject();
        if (afterStruct != null){
            // schema获取源数据的格式,fields获取里面的各个元素
            for (Field field : afterStruct.schema().fields()) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(field);
                dataJsonObj.put(fieldName,fieldValue);
            }
        }
        // 删除操作会使得data属性不为空,但size为0
        jsonObj.put("data",dataJsonObj);

        // 向下游发送数据
        collector.collect(jsonObj.toJSONString()) ;
    }

    // 指定类型
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

(4)总结

目前已完成功能:

  • 基本环境准备
  • 读取主流业务数据
  • 使用CDC读取配置表中数据
  • 将读取的配置数据流转换为广播流
  • 主流和广播流进行连接
  • 对连接之后的流进行处理(动态分流 – TableProcessFunction)
    • 处理广播流的方法 – processBroadcastElement
      • 获取状态
      • 从广播流中获取配置信息,将配置信息放到状态中
    • 处理主流的方法 – processElement
      • 获取状态
      • 从主流中获取表以及操作类型,根据表和操作类型封装key
      • 通过key到状态中获取配置对象
      • 根据配置对象的sinkType进行分流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90864.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

事务的隔离级别

目录 1.1 数据并发问题 1.2 SQL中的四种隔离级别 1.3 MySQL支持的四种隔离级别 1.4 如何设置事务的隔离级别 MySQL是一个 客户端&#xff0f;服务器 架构的软件&#xff0c;对于同一个服务器来说&#xff0c;可以有若干个客户端与之连接&#xff0c;每 个客户端与服务器连接…

简说四种架构的通用思维

一.自顶向下构建架构 ​1.首先定义问题&#xff0c;而定义问题中最重要的是定义客户的问题&#xff0c;特别主要识别出关键问题&#xff0c;关键问题是对客户有体感&#xff0c;能够解决客户痛点&#xff0c;通过一定的数据化来衡量识别出来&#xff0c;关键问题要优先给出解决…

我从“校园小白”到仿真“职场小达人”的CFD学习史

CFD属于CAE技术中比较难的一款软件&#xff0c;主要原因就是流体的特点决定了&#xff0c;因为相比于固体&#xff0c;流体太容易变形了&#xff0c;分子之间的距离比较大。所以&#xff0c;导致了一系列的问题。常常的结果是&#xff0c;忙了半天或者很长&#xff0c;根本得不…

Spring Security自定义认证逻辑实现图片验证码登录

前言 相信大家在网上冲浪都遇到过登录时输入图片验证码的情况&#xff0c;既然我们已经学习了 Spring Security&#xff0c;也上手实现过几个案例&#xff0c;那不妨来研究一下如何实现这一功能。 首先需要明确的是&#xff0c;登录时输入图片验证码&#xff0c;属于认证功能…

【录用案例】计算机电子类SCI,仅1个月15天录用

【期刊简介】IF&#xff1a;1.0-2.0&#xff0c;JCR4区&#xff0c;中科院4区 【检索情况】SCI在检&#xff0c;正刊 【征稿领域】自主传感器网络的高级接口电路及其应用 【参考周期】2-3个月左右 重要时间节点&#xff1a; 2022.12.15 | Accepted 2022.11.22 | 提交返修稿 20…

设计模式之建造者模式

builder desigin pattern 建造者模式的概念、建造者模式的结构、建造者模式的优缺点、建造者模式的使用场景、建造者模式的实现示例、建造者模式的源码分析 1、建造者模式的概念 将一个复杂对象的构建和表示分离&#xff0c;使得同样的创建过程可以得到不同的表示。其主要特点…

一种高复用的组件式安装包制作系统

目录 整体设计 流程描述 文件目录结构设计 产品资源的配置与更新 安装包制作流程 安装包执行流程 整体设计 如下展示了安装包系统的整体结构&#xff1a; 将集群布署可能用到的docker资源&#xff0c;按最小的单元进行整理&#xff0c;以压缩包的形式放于资源库&#xf…

欧姆龙电气元器件要点14讲

对于一个电气工程师来说&#xff0c;不仅仅只是要会PLC、伺服控制、变频器参数调优和总线控制、触摸屏这些大的元件的使用&#xff0c;还有很多种类齐全、功能各异的电气元器件&#xff0c;它们的作用、原理、维护方法、安全要点都要熟记于心&#xff0c;牢牢掌握。 第一章 电气…

艾美捷CpG ODN——ODN 1720 (TLRGRADE)说明书

艾美捷CpG ODN系列——ODN 1720 (TLRGRADE)&#xff1a;具有硫代磷酸酯骨架的GpC寡脱氧核苷酸。 艾美捷CpG ODN 丨ODN 1720 (TLRGRADE)化学性质&#xff1a; 序列&#xff1a;5-tccatgagcttcctgatgct-3&#xff08;小写字母表示硫代磷酸酯键&#xff09;。 MW&#xff1a;638…

Java中的运算符

算术运算符&#xff1a;&#xff0c; -&#xff0c;*&#xff0c;/&#xff0c;%&#xff0c;&#xff0c;--关系运算符&#xff1a;&#xff0c;!&#xff0c;<&#xff0c;>&#xff0c;<&#xff0c;>逻辑运算符&#xff1a;&&&#xff0c;||&#xff0…

基于STM32的温度控制系统

提示&#xff1a;记录毕设 文章目录前言一、任务书1.1设计(研究)目标:1.2设计(研究)内容:二、代码思路三、硬件四、联系我们五、设计六、框图代码等资料喜欢请点赞哦&#xff01;前言 基于STM32的温度控制系统&#xff0c;主控使用STM32F103ZET6&#xff0c;在正点原子的精英板…

ubuntu18.0 调节显卡GPU涡轮风扇转速

前言&#xff1a; 在炼丹的时候发现涡轮显卡的温度已经很高了85摄氏度&#xff0c;但是涡轮的风扇转速还不到65%&#xff0c;此时显卡计算频率明显已经下降了&#xff0c;所以需要手动调节风扇的转速&#xff0c;让噪音和计算速度处于均衡状态。 一、准备工作 》》安装显卡驱…

html:自定义网页右键菜单

<div id"menu"><divclass"menu-item"data-id"1">功能1</div><divclass"menu-item"data-id"2">功能2</div><divclass"menu-item"data-id"3">功能3</div><…

股票购买接口委托下单c++代码

炒股并非是运气可以驱使的&#xff0c;买股票不是赌博&#xff0c;是一种有风险的经济投资。在股市投资生涯中&#xff0c;掌握一门实战买卖技巧是我们必备的武器&#xff0c;这也是我们能长久在股市投资中得以生存的技法。 其实做股票投资是非常讲究买入和卖出的时机的。一只…

图像风格迁移-DSTN

样式传输的目的是从参考图像中再现具有样式的内容图像。现有的通用风格转换方法成功地以艺术或照片逼真的方式将任意风格传递给原始图像。然而&#xff0c;现有作品所定义的“任意风格”的范围由于其结构限制而在特定领域内受到限制。具体而言&#xff0c;根据预定义的目标域来…

用vscode配置C++3种编译器及多文件编译

末尾附上最终的模板 Vscode开发环境配置 C有很多种编译器&#xff0c;最重要的有三种 GNU的GCC(推荐)微软的MSVCClang/LLVM C的最新标准是C23,各个编译器对C各个标准的支持情况是不同的&#xff1a; C compiler support - cppreference.com 注意主要看C20的支持情况 用Vsco…

校园进销存网站

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 功能模块包括&#xff1a;员工模块、手机类型模块、供应商模块、采购模块、客户模块、销售模块、统计模块、库存模块 (1…

2022年的最后一个Win11 Dev预览版本

今日凌晨&#xff0c;微软向广大Win11 Dev用户推送了今年的最后一个版本更新&#xff0c;版本号为25267。根据官方的变化&#xff0c;引入了改变任务栏中可用搜索框样式的设置。此外&#xff0c;此版本还对任务栏、文件资源管理器、设置等进行了各种增强。 该公司还表示&#x…

nacos使用教程及原理简介

一、什么是 Nacos Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集&#xff0c;帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。 Nacos的关键特性包括&#xff1a; 服务发现和服务健康监测动态配置服务动态 DNS 服务服务及其元…

java排序算法

目录 一 冒泡排序 二 选择排序 三 插入排序 四 希尔排序 五 快速排序 5.1 单边循环快速排序 5.2 双边循环快速排序 六 二分查找 七 总结 一 冒泡排序 依次比较数组中相邻的两个元素&#xff0c;若 arr[i] > arr[i1]&#xff0c;则交换两个元素&#xff0c;两两都比…