Flink 实时数仓(二)【DIM 层搭建】

news2025/1/8 5:43:25

1、DIM 层搭建

1.1、设计要点

DIM层设计要点:

  • DIM层存的是维度表(环境信息,比如人、场、货等)
  • DIM层的数据存储在 HBase 表中
  • DIM层表名的命名规范为dim_表名

DIM 层表是用于维度关联的,要通过主键(维度外键)去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,所以这里选用 HBase 存储维度数据。

1.2、设计分析

在 ODS 层,我们需要首先把所有维表全量同步一次,之后当事实数据来了的时候就可以直接关联;现在 DIM 层,我们需要考虑的问题就是,如何把维表信息保存到 HBase:

  • 读取数据
    • Kafka 的 topic_db 主题中(包含所有46张业务表)
    • 可以使用 Flink 的 Kafka 连接器读取
  • 过滤数据
    • 从 46 张业务表中过滤出所有维表数据
      • 在代码中写死十几张维度表的表名
      • 问题:如果增加维表就需要修改代码重新编译项目,重启任务
      • 思考:如何不修改代码且不重启任务?
        • 1)定时任务:每隔一段时间加载一次配置信息(实时性不好,不可取)
        • 2)监控配置信息:
          • MySQL binlog:配置信息写到 MySQL,然后使用 FlinkCDC 监控直接创建流(数据流 connect 配置流,将配置流中的信息写入到状态中,然后数据流把状态中存在的表过滤出来
          • 文件:Flume tailDir Source -> Kafka -> Flink 的 kafka source 消费创建流(太复杂了,不可取)

注意:多个并行度下,配置流中的数据(需要过滤的表名)会被分配给多个相同的算子处理,会导致并行算子之间的状态不一致,可能导致数据丢失的问题;这就需要把配置流做成一个广播流来和数据流进行 connect,这样写入并行算子的状态就是一致的了;

        广播流的缺点就是存在冗余,而且并行度越大冗余也越大;配置信息小点还好,如果配置信息很大,那么将会占用的资源页越大;这种情况下我们的解决办法就是分流:

        对数据流和配置流都按照表名进行 keyby,相同的 key 再去做 connect,但是这种方案会产生数据倾斜;

  • 写出数据
    • 使用 Phoenix 写出到 HBase(使用 JdbcSink,如果不行就自定义 Sink)

1.3、DIM 层实现

DIM 层的主要任务:读取 Kafka 的数据 -> 简单ETL ->  保存到HBase

        Maxwell 同步过来的数据是到 Kafka 的,我们通过 Flink 自带的 Kafka 连接器进行连接读取,然后对数据先进行简单的 ETL,比如

  • 删除掉非 JSON 格式的数据(这里因为是业务数据所以一般不会有非JSON的情况出现,但是日志数据可能会存在这种情况)
  • 删除掉 type 为 bootstrap-start 和 bootstrap-complete 的数据;
  • 删除掉 type 为 delete 的数据;

最后,通过 Phoenix API 将数据插入到 HBase;

1.3.1、读取 Kafka 中的数据

        读取 Kafka 中的数据为的是创建主流,这里设计到的一个重点就是:Flink 作为消费者在从 Kafka 消费的时候需要对数据进行反序列化,而在反序列时如果使用 Flink 默认的 Kafka 反序列化器(FlinkKafkaConsumer)进行消费的话,可能会出现空指针异常:

        可以看到,反序列化方法中是直接把 kafka message 创建为一个 String 对象,但是 String 的构造器源码中明确声明构造参数不可为 null,而我们的 message 又不可避免存在一些空值,所以这里我们需要重写 FlinkKafkaConsumer 的反序列化方法:

public class MyKafkaUtil {

    private static final String KAFKA_SERVER = "hadoop102:9092";

    public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic,String groupId){

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);

        return new FlinkKafkaConsumer<String>(
                topic,
                // 反序列化格式
                new KafkaDeserializationSchema<String>() {
                    @Override
                    public boolean isEndOfStream(String nextElement) {
                        return false; // 无界流所以返回 false
                    }

                    @Override
                    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
                        if (record == null || record.value() == null){
                            return null;
                        }else {
                            return new String(record.value());
                        }
                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                },
                properties
        );
    }
}

这样,我们就可以直接使用通过 Kafka 地址主题 和 组id 使用 Flink 对 Kafka 的主题进行消费:

env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

至此,我们的主流已经创建完毕; 

1.3.2、简单 ETL 

        这一步主要为的是将不必要的数据移除掉,比如非JSON数据(日志数据中才可能出现)、maxwell 的脏数据( bootstrap-start 、bootstrap-complete 和 delete)这种无意义的信息;

public class DimApp {

    public static void main(String[] args) {
        // TODO 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 生产环境中设置为kafka主题的分区数

        // 1.1 开启checkpoint
        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次

        // 1.2 设置状态后端
        env.setStateBackend(new HashMapStateBackend());

        // TODO 2. 读取Kafka的topic_db主题,创建主流
        String topic = "topic_db";
        String groupId = "test";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        // TODO 3. ETL
        // TODO 3.1 过滤掉非JSON数据以及Maxwell的脏数据
        SingleOutputStreamOperator<JSONObject> filterJsonDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    // 过滤非JSON数据
                    JSONObject jsonObject = JSON.parseObject(value);

                    String type = jsonObject.getString("type");
                    if ("bootstrap-insert".equals(type) || "insert".equals(type) || "update".equals(type)) {
                        out.collect(jsonObject);
                    }
                } catch (Exception e) {
                    System.out.println("发现脏数据" + value);
                }
            }
        });

        // ...
    }
}

关于 Flink 的并行度,生产中一般设置为 Kafka 的分区数量(使消费者数量 = 主题分区数量),而不是机器的 CPU核数(机器总不能只跑这一个任务)!

1.3.3、动态增删维表

        现在我们需要从 46 张业务表中过滤出需要的维表,而且这个维表并不是写死的,很可能会出现新增和删除,所以我们希望做到不修改代码且不重启服务的情况下实现,我们上面业说过了,最好使用监控配置信息的方式,一共有三种解决方案:

  • MySQL binlog
    • 也就是把配置信息做成表格,使用 FlinkCDC 实时监控,使用双流联结(主流和配置流),配置流把配置信息(需要同步的维度表信息,包括业务系统中的维表名、写入到phoenix的表名、字段、主键、额外信息等)写入到状态当中,然后主流再去状态中读取并处理;
  • 文件:使用 Flume 的 tailDir source 实时监听文件内容,写到 Kafka 当中,Flink 再从 Kafka 去读,这种方式太复杂了,一般不用
  • Zookeeper:通过 Zookeeper 的 watch 机制将配置信息写入到一个 znode 节点,同样比较复杂

综上,我们一般选择第一种方案:

1)创建配置表
CREATE TABLE `table_process` (
  `source_table` varchar(200) NOT NULL COMMENT '业务系统来源表', -- mysql业务系统中的表名
  `sink_table` varchar(200) DEFAULT NULL COMMENT 'phoenix输出表', -- phoenix的表名
  `sink_columns` varchar(2000) DEFAULT NULL COMMENT 'phoenix建表所需字段', -- 建表需要的字段,过滤主流数据字段
  `sink_pk` varchar(200) DEFAULT NULL COMMENT 'phoenix建表的主键字段', -- 建表使用的主键(表名做主键)
  `sink_extend` varchar(200) DEFAULT NULL COMMENT 'phoenix建表扩展', -- 比如预分区等信息
  PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

这里,我们将 source_table 作为配置表的主键,这样可以通过它获取到该表需要同步的字段、主键和建表扩展,从而得到完整的 Phoenix 建表语句。

2)创建配置表的实体类

配置流在创建的时候我们并不会直接把它转为 JSON 格式,毕竟我们还要对它进行一些处理,而操作java对象比json对象要更容易;

@Data
public class TableProcess {
    //来源表
    String sourceTable;
    //输出表
    String sinkTable;
    //输出字段
    String sinkColumns;
    //主键字段
    String sinkPk;
    //建表扩展
    String sinkExtend;
}
3)使用 FlinkCDC 创建配置流

注意:FlinkCDC 把 binlog 读取过来会转为 json 格式

        // TODO 4. 使用FlinkCDC读取MySQL配置信息表创建配置流
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall")
                .tableList("gmall.table_config")
                .startupOptions(StartupOptions.initial()) // 全部读取
                .deserializer(new JsonDebeziumDeserializationSchema()) // flink读取binlog会把它转为json格式,所以这里需要一共json的反序列化方式
                .build();

        DataStreamSource<String> mysqlSourceDS = env.fromSource(
                mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "MysqlSource"
        );
4)配置流形成广播流

配置流不能直接和主流联结,会造成数据丢失(多并行度下,配置信息会轮询发送到相同的算子上),所以我们需要把它转为广播流;

创建广播流需要传入一个 Map 类型的状态描述器:

  • K 必须是主流和配置流都有的信息,这样主流才能和广播流产生关联,所以这里我们使用表名做为 K;
  • V 是配置流中的数据,这里我们选择上面自定义 TableProcess 对象,这个对象包含了该表(K)的所有配置信息;
        // TODO 5. 将配置流处理为广播流
        // K的要求: 1.必须是主流和配置流都有的字段 2. 唯一
        // V: 这里的v应该是配置流中的数据,但是为了方便过滤字段(操作java对象比json对象要容易),所以这里转为Java对象
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        // 这里的泛型是广播流的类型
        BroadcastStream<String> configBroadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);
5)双流联结并处理

这里对主流和配置流联结后需要进行处理:

  • 把配置流中的配置信息写入到状态后端使其自动广播
    • 从配置流(FlinkCDC 读取过来 json 信息)中提取出表格信息
    • 校验 phoenix 表格(不存在就创建)
    • 写入到状态中(因为现在是广播流所以会自动广播)
  • 把主流中的非维表去除掉,以及维表中不需要的字段
    • 从状态中获得配置信息
    • 除去非维表并过滤字段,只留下配置流当中存在的字段
    • 给主流添加上配置信息中的 sinkTable 字段(因为主流不知道最终向哪个phoenix表写入)
        // TODO 6. 连接主流和广播流
        // 这里的泛型: 1. 非广播流的数据类型 2. 广播流的数据类型
        BroadcastConnectedStream<JSONObject, String> connectedStream = filterJsonDS.connect(configBroadcastStream);

        // TODO 7. 处理连接流,根据配置信息处理主流数据
        // 得到维表数据流(已经把配置流中不需要的维表字段过滤掉了,以及非维表也被过滤掉了)
        SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));

这里的 TableProcessFunction 是我们自定义的广播流处理函数:

// 这里的泛型: 1. 非广播流的数据类型 2. 广播流的数据类型 3. 输出类型这里选择主流的数据类型,因为毕竟我们用配置流的目的就是为了得到过滤后的主流数据
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private Connection connection;
    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;

    public TableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor){
        this.mapStateDescriptor = mapStateDescriptor;
    }

    // 保证每个并行度创建一个连接
    @Override
    public void open(Configuration parameters) throws Exception {
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    @Override
    public void close() throws Exception {
        connection.close();
    }

    /**
     * value 的值:
     * {
     *  "before":null,
     *  "after":{
     *      "source_table":"aa",
     *      "sink_table":"bb",
     *      "sink_columns":"cc",
     *      "sink_pk":"id",
     *      "sink_extend":"xxx"},
     *  "source":{
     *      "version":"1.5.4.Final",
     *      "connector":"mysql",
     *      "name":"mysql_binlog_source",
     *      "ts_ms":1652513039549,
     *      "snapshot":"false",
     *      "db":"gmall-211126-config",
     *      "sequence":null,
     *      "table":"table_process",
     *      "server_id":0,
     *      "gtid":null,
     *      "file":"",
     *      "pos":0,
     *      "row":0,
     *      "thread":null,
     *      "query":null},
     *   "op":"r",
     *   "ts_ms":1652513039551,
     *   "transaction":null}
     */
    @Override
    public void processBroadcastElement(String value, Context context, Collector<JSONObject> collector) throws Exception {
        // TODO 1. 获取并解析数据,方便主流操作(把 "after" 字段的内容解析为 TableProcess 对象)
        JSONObject jsonObject = JSONObject.parseObject(value);
        TableProcess tableProcess = JSONObject.parseObject(jsonObject.getString("after"), TableProcess.class);
        // TODO 2. 校验表在phoenix中是否存在
        checkTable(tableProcess.getSinkTable(),
                tableProcess.getSinkColumns(),
                tableProcess.getSinkPk(),
                tableProcess.getSinkExtend());

        // TODO 3. 写入状态,广播出去
        BroadcastState<String, TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
        broadcastState.put(tableProcess.getSourceTable(),tableProcess);
    }

    /**
     * 校验并创建phoenix表: create table if not exists db.tb(xx varchar primary key,xx varchar, ...) xxx
     * @param sinkTable 表名
     * @param sinkColumns 字段
     * @param sinkPk 主键
     * @param sinkExtend 扩展字段
     */
    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {

        PreparedStatement preparedStatement = null;

        try {
            // 处理特殊字段值(null)
            if (sinkPk == null || "".equals(sinkPk))
                sinkPk = "id";
            if (sinkExtend == null)
                sinkExtend = "";

            // 拼接 SQL
            StringBuilder sql = new StringBuilder("create table if not exists ")
                    .append(GmallConfig.HBASE_SCHEMA)
                    .append(".")
                    .append(sinkTable)
                    .append("(");
            String[] columns = sinkColumns.split(",");
            for (int i = 0; i < columns.length-1; i++) {
                sql.append(" ").append(columns[i]).append(" varchar");
                if (columns[i].equals(sinkPk))
                    sql.append(" primary key");
                sql.append(",");
            }
            sql.append(columns[columns.length-1])
                    .append(") ")
                    .append(sinkExtend);

            // 编译 SQL
            preparedStatement = connection.prepareStatement(sql.toString());

            // 执行 SQL
            preparedStatement.execute();
        }catch (SQLException e){
            // 要停止程序必须使用运行时异常而不是编译时异常
            throw new RuntimeException("建表失败 " + sinkTable);
        }finally {
            // 释放资源
            if (preparedStatement != null){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

    }


    // 主流数据是一个json格式(maxwell数据)
    /**
     * {
     *  "database":"gmall-211126-flink",
     *  "table":"base_trademark",
     *  "type":"bootstrap-insert",
     *  "ts":1652499295,
     *  "data":{
     *      "id":1,
     *      "tm_name":"三星",
     *      "logo_url":"/static/default.jpg"
     *      }
     * }
     */
    @Override
    public void processElement(JSONObject jsonObject, ReadOnlyContext readOnlyContext, Collector<JSONObject> collector) throws Exception {
        // TODO 1. 获取广播的配置数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = readOnlyContext.getBroadcastState(mapStateDescriptor);
        // 如果返回 null 说明不是维表
        TableProcess tableProcess = broadcastState.get(jsonObject.getString("table"));

        // TODO 2. 过滤字段,只留下配置流当中存在的字段
        if (tableProcess == null) return;
        filterColumns(jsonObject.getJSONObject("data"),tableProcess.getSinkColumns());

        // TODO 3. 补充 SinkTable 字段(因为主流中是不包含phoenix表名的)
        jsonObject.put("sinkTable",tableProcess.getSinkTable());
        collector.collect(jsonObject);
    }

    /**
     * 过滤字段: 将主流中配置流中有的字段留下,其它字段删除
     * @param data 可能是维表,也可能是事实表 {"database":"gmall-211126-flink","table":"base_trademark","type":"bootstrap-insert","ts":1652499295,"data":{"id":1,"tm_name":"三星","logo_url":"/static/default.jpg"}}
     * @param sinkColumns phoenix 列名
     */
    private void filterColumns(JSONObject data, String sinkColumns) {
        // 把 JSONObject 当做 Map 处理即可
        String[] split = sinkColumns.split(",");
        Set<String> phoenix_columns = new HashSet<>(Arrays.asList(split));
        for (String column : data.keySet()){
            if (!phoenix_columns.contains(column)){
                data.remove(column);
            }
        }
        // 简写: data.entrySet().removeIf(entry -> !phoenix_columns.contains(entry.getKey()));
    }

}

至此,我们就得到了最终等待写入到 HBase 的流 dimDS;

1.3.4、写入 Phoenix

        上面我们在连接 phoenix 校验表格的时候用的是 jdbc 来访问的,而 Flink 也提供了 JdbcSink 连接器,那这里我们能不能使用呢?

        其实这里使用 JdbcSink 是可以的,但是不推荐,因为 JdbcSink 适合的是单表写入的场景,而我们的 dimDS 数据流中存放的是多个维度表的数据,这就要求当数据来的时候,我们要根据不同的表生成不同的 SQL,而这里的 addSink 方法中的 sql 语句必须是先给定的,尽管不确定的表名、字段名等可以使用占位符,但是我们不能保证所有维表的字段数量都是一样的;所以,这种方式显然不可取,那我们就只能自定义一个 Sink 了:

1)创建 Druid 连接池

Phoenix 是支持 JDBC 协议,这里为了方便连接管理我们使用 Druid 来创建连接池;

public class DruidDSUtil {
    private static DruidDataSource druidDataSource = null;

    public static DruidDataSource createDataSource() {
        // 创建连接池
        druidDataSource = new DruidDataSource();
        // 设置驱动全类名
        druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);
        // 设置连接 url
        druidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);
        // 设置初始化连接池时池中连接的数量
        druidDataSource.setInitialSize(5);
        // 设置同时活跃的最大连接数
        druidDataSource.setMaxActive(20);
        // 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
        druidDataSource.setMinIdle(1);
        // 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
        druidDataSource.setMaxWait(-1);
        // 验证连接是否可用使用的 SQL 语句
        druidDataSource.setValidationQuery("select 1");
        // 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
        // 注意,默认值为 true,如果没有设置 validationQuery,则报错
        // testWhileIdle is true, validationQuery not set
        druidDataSource.setTestWhileIdle(true);
        // 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
        druidDataSource.setTestOnBorrow(false);
        // 归还连接时,是否测试
        druidDataSource.setTestOnReturn(false);
        // 设置空闲连接回收器每隔 30s 运行一次
        druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
        // 设置池中连接空闲 30min 被回收,默认值即为 30 min
        druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);

        return druidDataSource;
    }
}
2)Phoenix 工具类

为了方法的复用性,我们把写入 Phoenix 的方法抽出来:

public class PhoenixUtil {

    /**
     * 将主流数据写入 phoenix
     * @param connection phoenix连接
     * @param sinkTable 表名
     * @param data 数据
     * @throws SQLException 这里的异常直接抛出去.因为工具类中的方法是给大家公用的,而不同的业务捕获到异常的处理方案是不一样的
     * 所以这里把处理异常的权利交给每个调用该方法的人
     */
    public static void upsertValues(DruidPooledConnection connection, String sinkTable, JSONObject data) throws SQLException {
        // 1. 拼接 SQL: upsert into db.tb(id,name,sex) values ('1001','zhangsan','man')
        Set<String> columns = data.keySet();
        Collection<Object> values = data.values();
        String sql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "("
                + StringUtils.join(columns,",") +") values ( '"
                + StringUtils.join(values,"','") + "')";

        // 2. 预编译 SQL
        PreparedStatement preparedStatement = connection.prepareStatement(sql);

        // 3. 执行
        preparedStatement.execute();
        connection.commit();

        // 4. 释放资源
        preparedStatement.close(); // connection 在 Sink 的 invoke 里面关
    }
}
3)自定义 Sink

自定义 Sink ,在 open 方法中获得连接,在 invoke 中执行插入数据到 Phoenix ,然后回收连接;

// 输入数据类型应该是 dimDS 的类型,也就是主流类型 JSONObject
public class DimSinkFunction extends RichSinkFunction<JSONObject> {

    private DruidDataSource druidDataSource = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        druidDataSource = DruidDSUtil.createDataSource();
    }

    /**
     * value:
     * {
     *  "database":"gmall-211126-flink",
     *  "table":"base_trademark",
     *  "type":"bootstrap-insert",
     *  "ts":1652499295,
     *  "data":{
     *      "id":1,
     *      "tm_name":"三星"
     *      },
     *  "sinkTable": "dim_xxx"
     * }
     */
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        // 获取连接
        DruidPooledConnection connection = druidDataSource.getConnection();
        // 写出数据(需要知道写出的表名、字段)
        String sinkTable = value.getString("sinkTable");
        JSONObject data = value.getJSONObject("data");
        // 如果插入数据失败 invoke 方法抛出的 Exception 会导致程序停止
        PhoenixUtil.upsertValues(connection,sinkTable,data);
        // 归还连接
        connection.close();
    }
}

总结

        至此,DIM 层搭建完毕,在离线数仓的 DIM 层中,它需要在 ODS 层的基础上抽取出主维表和相关维表,然后主维表通过 left join 相关维表得到最终的 dim 层的维表;而实时数仓中我们主要是通过 Flink 代码来对数据流进行实时处理,代码的编写确实比 SQL 更有意思;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1965871.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Chapter 22 数据可视化——折线图

欢迎大家订阅【Python从入门到精通】专栏&#xff0c;一起探索Python的无限可能&#xff01; 文章目录 前言一、Pyecharts介绍二、安装Pyecharts三、全局配置项四、绘制折线图 前言 在大数据时代&#xff0c;数据可视化成为了分析和展示数据的重要手段。Pyecharts 是一个基于 …

微信小程序-获取手机号:HttpClientErrorException: 412 Precondition Failed: [no body]

问题&#xff1a; 412 异常就是你的请求参数获取请求头与服务器的不符&#xff0c;缺少请求体&#xff01; 我的问题&#xff1a; 我这里获取微信手机号的时候突然给我报错142&#xff0c;但是代码用的是原来的代码&#xff0c;换了一个框架就噶了&#xff01; 排查问题&am…

esp-07s 模块的WIFI 联网和MQTT AT指令测试,固件下载更新方法

安信可官网: https://docs.ai-thinker.com/start 一、wifi 联网测试指令 版本&#xff1a;AT version: 1.2.0.0 //1.重启模块 ATRST//2.设置当前 Wi-Fi 模式&#xff0c;不保存到 flash ATCWMODE_CUR1//3.设置 DHCP&#xff0c;不保存到 flash ATCWDHCP_CUR1,1//4.上电是否…

AIGC大模型产品经理高频面试大揭秘‼️

近期有十几个学生在面试大模型产品经理&#xff08;薪资还可以&#xff0c;详情见下图&#xff09;&#xff0c;根据他们面试&#xff08;包括1-4面&#xff09;中出现高频大于3次的问题汇总如下&#xff0c;一共32道题目&#xff08;有答案&#xff09;。 29.讲讲T5和Bart的区…

使用PhotoMaker V2产生明星香水广告照片

PhotoMaker V2 是一个令人兴奋的工具&#xff0c;可以帮助您快速生成逼真的个性化人物照片。您只需提供一张或几张面部照片以及一个文本提示&#xff0c;即可在几秒钟内获得定制的照片或绘画&#xff0c;无需进行额外的训练。这个模型还可以与其他基于 SDXL 的基础模型或其他 L…

干货 | 2024中国联通算力网络安全白皮书(免费下载)

本白皮书以国家整体安全观为指导&#xff0c;充分发挥网络安全现代产业链链长的主体支撑和融通带动作用&#xff0c;提出算力网络“新质安全、共链可信”的安全愿景和“构建开放融合内生免疫弹性健壮网安智治的一体化安全”的安全目标。从运营商开展网络建设和应用部署的角度出…

安全编程:Rust示例强密码策略

一、什么是强密码策略&#xff1f; 强密码策略是一套旨在提高账户安全性的规则和建议。以下是一些创建和使用强密码的关键策略&#xff1a; 长度&#xff1a;密码应至少包含8个字符&#xff0c;更长的密码通常更安全。 复杂性&#xff1a;使用大小写字母、数字和特殊字符的组合…

OAuth2的四种认证方式

文章目录 客户端认证grant_typeclient_credential授权码认证access_token密码认证grant_typepassword刷新token认证其他认证 OAuth2是目前流行的认证协议&#xff0c;主要包含四种认证方式&#xff1a;客户端认证、密码认证、授权码认证、刷新令牌认证。 客户端认证grant_typec…

SpringBoot项目打包成war包

1. 项目场景 使用SpringBoot 开发项目&#xff0c;由于内置了Tomcat&#xff0c;所以项目可以直接启动&#xff0c;部署到服务器的时候&#xff0c;直接打成 jar 包&#xff0c;就可以运行了。 有时需要把项目打包放入外置的 Tomcat 或者 TongWeb 中运行&#xff0c;就需要把…

【Golang 面试 - 进阶题】每日 3 题(九)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…

搞懂收发模式(Transmit、Receive、IT、DMA、ToIdle、Abort、Callback)

搞懂收发模式&#xff08;Transmit、Receive、IT、DMA、ToIdle、Abort&#xff09; 文章目录 搞懂收发模式&#xff08;Transmit、Receive、IT、DMA、ToIdle、Abort&#xff09;1、阻塞模式&#xff08;阻塞轮询&#xff09;2、非阻塞模式&#xff08; IT &#xff09;3、直接内…

D盘根目录莫名出现 *.scratch 文件夹

不知道从什么时候开始&#xff0c;突然发现D盘根目录出现奇怪的空文件夹&#xff08;图一&#xff09;&#xff0c;一开始因为需求紧张没时间管&#xff0c;但是没几天就发现这个空文件夹越来越多&#xff0c;多到上百个了。 要是几个那还能忍忍&#xff0c;这种程度已经严重影…

IDEA优化配置,提高启动和运行速度

一、修改配置参数 IDEA默认启动配置主要考虑低配置用户&#xff0c;参数不高&#xff0c;导致 启动慢&#xff0c;然后运行也不流畅&#xff0c;这里我们需要优化下启动和运行配置&#xff1b; 找到idea安装的bin目录&#xff1b; 你的按照目录\IntelliJ IDEA 2018.2.2\bin …

客户现场电脑卡死

最近遇到一个很奇怪的现象,客户现场机台运行过程中,出现不规律的卡顿或假死蓝屏情况,软件分析显得异常重要. 首先我们从软件的运行状态,内存,cpu利用率等性能方面排查,通过观察,我们发现内存没有暴涨的情况,cpu利用率也不是很高,表现的现象就是不定时的出现软件假死,卡顿现象.…

SQL插入、更新和删除数据

SQL插入、更新和删除数据 一、直接向表插入数据 1.1、插入完整的行 这里所说的完整行指的是包含表内所有字段的数据行&#xff1b;假设表中有n个字段&#xff0c;则插入完整行的语法&#xff1a; INSERT INTO 表名或视图名 VALUES(字段1的值,字段2的值,字段3的值,...,字段n的…

JSONP跨域

1 概述 定义 json存在的意义&#xff1a; 不同类型的语言&#xff0c;都能识别json JSONP(JSON with Padding)是JSON的一种“使用模式”&#xff0c;可用于解决主流浏览器的跨域数据访问的问题。由于同源策略&#xff0c;一般来说位于 server1.example.com 的网页无法与不是 s…

MCU单片机GPIO初始化该按什么顺序配置?为什么初始化时有电平跳变?

GPIO初始化时有时钟配置、模式配置、输出配置、复用配置&#xff0c;那么在编写初始化代码时&#xff0c;到底该按什么顺序执行呢&#xff1f;如果顺序不当那初始化过程可能会出现短暂的电平跳变。 第一步&#xff0c;初始化MCU外设时&#xff0c;一般都需要先打开对应寄存器的…

Multi-Head Mixture-of-Experts笔记

这篇文章&#xff08;还是校友&#xff09;&#xff0c;也是和dot product本身没什么关系。讲一讲核心思想 文章在背景中介绍了Sparse Mixture of Experts&#xff0c;因为MH-MoE也是在S-MoE基础上做的&#xff0c;个人感觉其实变动并不大&#xff0c;但我觉得写的很清楚&…

优化 Spring Boot 项目启动速度:高效管理大量 Bean 注入

个人名片 🎓作者简介:java领域优质创作者 🌐个人主页:码农阿豪 📞工作室:新空间代码工作室(提供各种软件服务) 💌个人邮箱:[2435024119@qq.com] 📱个人微信:15279484656 🌐个人导航网站:www.forff.top 💡座右铭:总有人要赢。为什么不能是我呢? 专栏导…

卷积神经网络 - 卷积运算篇

序言 在探索深度学习尤其是计算机视觉领域的奥秘时&#xff0c;卷积神经网络&#xff08; Convolutional Neural Networks, CNNs \text{Convolutional Neural Networks, CNNs} Convolutional Neural Networks, CNNs&#xff09;无疑占据了核心地位。而卷积运算&#xff0c;作为…