数据仓库环境准备完整使用 (第四章)

news2024/11/18 17:53:43

数据仓库环境准备完整使用

  • 一、IDEA 开发环境准备
    • 1、创建项目gmall-realtime
    • 2、删除当前项目的src目录并创建gmall-realtime模块
    • 3、创建子项目
    • 4、导入依赖
    • 5、创建相关的包
    • 6、在 resources 目录下创建 log4j.properties 文件,写入如下内容
  • 二、数据仓库运行环境(ODS)
    • 1、Flink 环境搭建
    • 2、Hbase 环境搭建
      • 1)Hbase 集群部署
      • 2)IDEA Phoenix 环境准备
        • (1)引入 Phoenix Thick Client 依赖
        • (2)在 resources 目录下创建 hbase-site.xml 文件,并在文件中添加如下内容
    • 3) ClickHouse 环境搭建
    • 4)模拟数据准备(ODS)
      • (1) 用户行为日志
      • (2) 业务数据
        • (1)维度数据首日全量同步
        • (2)业务数据生成
  • 三、数仓开发之DIM层(DIM)
    • 1、总结概括下面优化方案总结
    • 2、配置表
      • 1) Flink CDC
      • 2) 配置表设计
        • (1) 字段解析
        • (2) 在Mysql中创建数据库建表并开启Binlog
    • 3、主要任务
      • 1) 接收Kafka数据,过滤空值数据
      • 2) 动态拆分维度表功能
      • 3) 把流中的数据保存到对应的维度表
    • 4、代码实现实现
      • 1) 接收Kafka数据,过滤空值数据
        • (1) 创建 KafkaUtil 工具类
        • (2) 主程序
      • 2) 根据MySQL的配置表,动态进行分流
        • (1) 导入依赖
        • (2) 创建配置表实体类
        • (3) 编写操作读取配置表形成广播流
        • (4) 定义一个项目中常用的配置常量类GmallConfig
      • 3) 自定义函数MyBroadcastFunction
        • (1) 定义类MyBroadcastFunction
        • (2) 自定义函数MyBroadcastFunction-open
        • (3) 自定义函数MyBroadcastFunction-processBroadcastElement
        • (4) 自定义函数MyBroadcastFunction-checkTable
        • (5) 自定义函数MyBroadcastFunction-processElement()
        • (6) 自定义函数MyBroadcastFunction-filterColumns(),校验字段,过滤掉多余的字段
        • (7) 主程序DimSinkApp中调用MyBroadcastFunction提取维度数据
      • 4) 保存维度到HBase(Phoenix)
        • (1) 程序流程分析
        • (2) 创建 PhoenixUtil 工具类,在其中创建insertValues()方法
        • (3) MyPhoenixSink
        • (4) 主程序 DimSinkApp 中调用 MyPhoenixSink
        • (5) 测试

一、IDEA 开发环境准备

1、创建项目gmall-realtime

在这里插入图片描述

2、删除当前项目的src目录并创建gmall-realtime模块

在这里插入图片描述

3、创建子项目

在这里插入图片描述

4、导入依赖

  <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--如果保存检查点到hdfs上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <!-- connector和ormat依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

5、创建相关的包

在这里插入图片描述

6、在 resources 目录下创建 log4j.properties 文件,写入如下内容

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n

log4j.rootLogger=error,stdout

二、数据仓库运行环境(ODS)

1、Flink 环境搭建

Flink 集群部署请参考 之前的博客

2、Hbase 环境搭建

1)Hbase 集群部署

Flink 集群部署请参考 之前的博客

2)IDEA Phoenix 环境准备

Phoenix 安装参考本节1)中文档。IDEA Phoenix 环境准备操作如下:

(1)引入 Phoenix Thick Client 依赖

<dependency>
<groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-spark</artifactId>
    <version>5.0.0-HBase-2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.el</artifactId>
        </exclusion>
    </exclusions>
</dependency>

(2)在 resources 目录下创建 hbase-site.xml 文件,并在文件中添加如下内容

在这里插入图片描述
hbase-size.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
    <name>hbase.regionserver.wal.codec</name>
    <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
    </property>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop102:8020/HBase</value>
    </property>

    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop102,hadoop103,hadoop104</value>
    </property>

    <property>
        <name>hbase.unsafe.stream.capability.enforce</name>
        <value>false</value>
    </property>

    <property>
        <name>hbase.wal.provider</name>
        <value>filesystem</value>
    </property>

    <!-- 注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加>上以上两个配置,并使用xsync进行同步(本节1中文档已有说明)。-->
    <property>
        <name>phoenix.schema.isNamespaceMappingEnabled</name>
        <value>true</value>
    </property>

    <property>
        <name>phoenix.schema.mapSystemTablesToNamespace</name>
        <value>true</value>
    </property>

</configuration>

3) ClickHouse 环境搭建

ClickHouse 安装请参考之前的

4)模拟数据准备(ODS)

通常企业在开始搭建数仓时,业务系统中会存在历史数据,一般是业务数据库存在历史数据,而用户行为日志无历史数据。假定数仓上线的日期为2022-02-21,为模拟真实场景,需准备以下数据。

(1) 用户行为日志

用户行为日志,一般是没有历史数据的,故日志只需要准备2022-02-21一天的数据。具体操作如下:
(1)启动 Kafka。

(2)启动一个命令行 Kafka 消费者,消费 topic_log 主题的数据。

(3)修改两个日志服务器(hadoop102、hadoop103)中的
/opt/module/applog/application.yml配置文件,将mock.date参数改为2022-02-21。

(4)执行日志生成脚本lg.sh。

(5)观察命令行 Kafka 消费者是否消费到数据。

在这里插入图片描述

(2) 业务数据

实时计算不考虑历史的事实数据,但要考虑历史维度数据。因此要对维度相关的业务表做一次全量同步。

(1)维度数据首日全量同步

①修改Maxwell配置文件中的mock_date参数

[atguigu@hadoop102 maxwell]$ vim /opt/module/maxwell/config.properties

mock_date=2022-02-21

②启动业务数据采集通道,包括Maxwell、Kafka
③编写业务数据首日全量脚本

与维度相关的业务表如下

activity_info
activity_rule
activity_sku
base_category1
base_category2
base_category3
base_province
base_region
base_trademark
coupon_info
coupon_range
financial_sku_cost
sku_info
spu_info
user_info

切换到 /home/atguigu/bin 目录,创建 mysql_to_kafka.sh 文件

[atguigu@hadoop102 maxwell]$ cd ~/bin
[atguigu@hadoop102 bin]$ vim mysql_to_kafka_init.sh

在脚本中添加如下内容

#!/bin/bash

# 该脚本的作用是初始化所有的业务数据,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"activity_info")
  import_data activity_info 
  ;;
"activity_rule")
  import_data activity_rule 
  ;;
"activity_sku")
  import_data activity_sku 
  ;;
"base_category1")
  import_data base_category1 
  ;;
"base_category2")
  import_data base_category2 
  ;;
"base_category3")
  import_data base_category3 
  ;;
"base_province")
  import_data base_province 
  ;;
"base_region")
  import_data base_region 
  ;;
"base_trademark")
  import_data base_trademark 
  ;;
"coupon_info")
  import_data coupon_info 
  ;;
"coupon_range")
  import_data coupon_range 
  ;;
"financial_sku_cost")
  import_data financial_sku_cost 
  ;;
"sku_info")
  import_data sku_info 
  ;;
"spu_info")
  import_data spu_info 
  ;;
"user_info")
  import_data user_info 
  ;;
"all")
  import_data activity_info 
  import_data activity_rule 
  import_data activity_sku 
  import_data base_category1 
  import_data base_category2 
  import_data base_category3 
  import_data base_province 
  import_data base_region 
  import_data base_trademark 
  import_data coupon_info 
  import_data coupon_range 
  import_data financial_sku_cost 
  import_data sku_info 
  import_data spu_info 
  import_data user_info 
  ;;
esac

增加执行权限

[atguigu@hadoop102 bin]$ chmod +x mysql_to_kafka_init.sh

执行脚本

[atguigu@hadoop102 bin]$ mysql_to_kafka_inc_init.sh all

④启动 Kafka 消费者,观察数据是否写入 Kafka

[atguigu@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db

(2)业务数据生成

业务数据生成之前要将 application. properties 文件中的 mock.date 参数修改为业务时间,首日应设置为 2022-02-21。

mock.clear 和 mock.clear.user 均为 0,表示不重置业务数据和用户数据。如下
在这里插入图片描述

同时要保证 Maxwell 配置文件 config.properties 中的 mock.date 参数和 application. properties 中 mock.date 参数的值保持一致。如下。

在这里插入图片描述

三、数仓开发之DIM层(DIM)

1、总结概括下面优化方案总结

DIM层
	数据源:kafka---topic_db(包含所有的业务表)
	过滤数据:过滤出所需要的维表数据
		过滤条件:在代码中给定十几张维表的表名
		问题:如果增加维表,需要修改代码-重新编译-打包-上传、重启任务
			优化1:不修改代码、只重启任务
				配置信息中保存需要的维表信息、配置信息只在程序启动的时候加载一次
			优化2:不修改代码、不只重启任务
				方向:让程序在启动的以后还可以获取配置信息中增加的内容
				具体实施:
					1) 定时任务:每隔一段时间加载一次配置信息 
						将定时任务写在Open方法
					2) 监控配置信息: 一旦配置信息增加了数据,可以立马获取到
						(1) MySQLBinglog: FlinkCDC 监控直接创建流
							a.将配置信息处理成广播流:缺点-> 如果配置信息过大,冗余太多
							b.按照表名进行keyBy处理:缺点-> 有可能产生数据倾斜
						(2) 文件:Flum->kafka->Flink 消费创建流
			
	写出数据:将数据写出到Phoenix
		JdbcSink、自定义Sink

DIM层设计要点:

(1)DIM层的设计依据是维度建模理论该层存储维度模型的维度表

(2)DIM层的数据存储在 HBase 表中DIM 层表是用于维度关联的,要通过主键去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,因而选用 HBase 存储维度数据。

(3)DIM层表名的命名规范为dim_表名

2、配置表

本层的任务是将业务数据直接写入到不同的 HBase 表中。那么如何让程序知道流中的哪些数据是维度数据?维度数据又应该写到 HBase 的哪些表中?为了解决这个问题,我们选择在 MySQL 中构建一张配置表,通过 Flink CDC 将配置表信息读取到程序中。

1) Flink CDC

Flink CDC 的介绍及配置请参考下面

2) 配置表设计

(1) 字段解析

我们将为配置表设计五个字段
source_table:作为数据源的业务数据表名 
sink_table:作为数据目的地的 Phoenix 表名
sink_columns:Phoenix 表字段
sink_pk:Phoenix 表主键
sink_extend:Phoenix 建表扩展,即建表时一些额外的配置语句

将 source_table 作为配置表的主键,可以通过它获取唯一的目标表名、字段、主键和建表扩展,从而得到完整的 Phoenix 建表语句。

(2) 在Mysql中创建数据库建表并开启Binlog

(1)创建数据库 gmall_config ,注意:和 gmall 业务库区分开

[atguigu@hadoop102 db_log]$ mysql -uroot -p000000 -e"create database gmall_config charset utf8 default collate utf8_general_ci"

(2)在 gmall_config 库中创建配置表 table_process

CREATE TABLE `table_process` (
  `source_table` varchar(200) NOT NULL COMMENT '来源表',
  `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
  `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
  `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
  `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
  PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(3)在MySQL配置文件中增加 gmall_config 开启Binlog

参考以往开启的开启Binlog

3、主要任务

1) 接收Kafka数据,过滤空值数据

对Maxwell抓取的数据进行ETL,有用的部分保留,没用的过滤掉。

2) 动态拆分维度表功能

由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理所以需要把各个维度表拆开处理。

在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。

这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张维度表表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有三个方案实现:

1、一种是用Zookeeper存储,通过Watch感知数据变化
2、另一种是用mysql数据库存储,周期性的同步
3、再一种是用mysql数据库存储,使用广播流

这里选择第三种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。
所以就有了如下图:

在这里插入图片描述

3) 把流中的数据保存到对应的维度表

维度数据保存到HBase的表中

4、代码实现实现

1) 接收Kafka数据,过滤空值数据

(1) 创建 KafkaUtil 工具类

和 Kafka 交互要用到 Flink 提供的 FlinkKafkaConsumer、FlinkKafkaProducer 类,为了提高模板代码的复用性,将其封装到 KafkaUtil 工具类中。

此处从 Kafka 读取数据,创建 getKafkaConsumer(String topic, String groupId) 方法

package org.example.utils;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;


import java.util.Properties;

/**
 * kafka的链接
 */
public class MyKafkaUtil {

    private static String KAFKA_SERVER = "hadoop102:9092";

    public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        //消费出来是反序列化(SimpleStringSchema)
        return new FlinkKafkaConsumer<String>(
                topic,
                new KafkaDeserializationSchema<String>() {

                    @Override
                    public boolean isEndOfStream(String s) {
                        //isEndOfStream 是否是流的最后一个
                        //无界的流永远false
                        return false;
                    }

                    @Override
                    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
                        if (record == null || record.value() == null) {
                            //为了解决空指针问题、给他设置空字符串、后面会自动过滤掉这个
                            return "";
                        } else {
                            return new String(record.value());
                        }

                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        //TypeInformation<String> 数据类型
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                },
                properties
        );
    }
}

(2) 主程序

package org.example.app.dim;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;

import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.example.app.func.DimSinkFunction;
import org.example.app.func.TableProcessFunction;
import org.example.bean.TableProcess;
import org.example.utils.MyKafkaUtil;

import java.util.Properties;

/**
 * 有两个流
 * 一个主流: 来自KAFKA----topic_db  -maxwell写进去的
 * 配置流通过FlinkCDC直接读的
 *
 * // 数据流:web/app -> nginx -> 业务服务器 ->Mysql(binlog) -> Maxwell ->kafka(ODS) -> FlinkApp ->Phoenix
 * //程 序: Mock -> Mysql(binlog) -> Maxwell ->Kafka(ZK) ->DimApp ->Phoenix(HBase/ZK/HDFS)
 */
public class DimApp {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境中设置为Kafka 主题分区数

        //1.1 开启Checkpoint
//        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE); //5分钟开启一次
//        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L); //超时10分钟
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //共存的有几个Checkpoint
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5)); //总共会尝试3次重启、每隔5秒尝试一次(固定匹配重启)

        //1.2 设置状态后端(本地的内存级别的)
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8082/211126/ck");
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2、读取kafka topic_db 主题数据创建主流
        String topic = "topic_db";
        String groupId = "dim_app";

        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //3、过滤掉非JSON数据 &以及保留新增、变化以及初始化数据并将数据转换为JSON格式
        SingleOutputStreamOperator<JSONObject> filterJsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {

            /**
             * TODO 1、转换成json
             * TODO 2、过滤
             * @param value
             * @param out
             * @throws Exception
             */
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    //将数据转换为JSON
                    JSONObject jsonObject = JSON.parseObject(value);

                    //获取数据中的操作类型字段
                    String type = jsonObject.getString("type");

                    //保留新增、变换以及初始化数据
                    if ("insert".equals(type) || "update".equals(type) || "bootstrap-insert".equals(type)) {
                        out.collect(jsonObject);
                    }


                } catch (Exception e) {
                    System.out.println("发现数据异常");
                }
            }
        });


        //9、启动任务
        env.execute("DimApp");
    }
}

2) 根据MySQL的配置表,动态进行分流

(1) 导入依赖

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>5.0.0-HBase-2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

	<!-- 如果不引入 flink-table 相关依赖,则会报错:
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.base.source.reader.RecordEmitter
引入以下依赖可以解决这个问题(引入某些其它的 flink-table相关依赖也可)
-->
		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>

(2) 创建配置表实体类

package com.atguigu.gmall.realtime.bean;
import lombok.Data;

@Data
public class TableProcess {
    //来源表
    String sourceTable;
    //输出表
    String sinkTable;
    //输出字段
    String sinkColumns;
    //主键字段
    String sinkPk;
    //建表扩展
    String sinkExtend;
}

(3) 编写操作读取配置表形成广播流

        Properties prop = new Properties();
        prop.setProperty("useSSL", "false");
        //4、使用FlinkCDC读取Mysql配置信息表创建配置流
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-config")
                .tableList("gmall-config.table_process")
                .startupOptions(StartupOptions.initial())//全部都要历史数据
                .serverTimeZone("UTC")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .jdbcProperties(prop)
                .build();

        DataStreamSource<String> mySqlSourceDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");

//        mySqlSourceDS.print(">>>");
        //5、将配置流处理为广播流(map状态描述器)
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        //得到广播流
        BroadcastStream<String> broadcastStream = mySqlSourceDS.broadcast(mapStateDescriptor);

        //6、连接主流与广播流d
        BroadcastConnectedStream<JSONObject, String> connectedStream = filterJsonObjDS.connect(broadcastStream);

(4) 定义一个项目中常用的配置常量类GmallConfig

package com.atguigu.gmall.realtime.common;

public class GmallConfig {

    // Phoenix库名
    public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";

    // Phoenix驱动
    public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";

    // Phoenix连接参数
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
}

3) 自定义函数MyBroadcastFunction

(1) 定义类MyBroadcastFunction

自定义广播流

在这里插入图片描述

package com.atguigu.gmall.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class MyBroadcastFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

	private MapStateDescriptor<String, TableProcess> tableConfigDescriptor;

public MyBroadcastFunction(MapStateDescriptor<String, TableProcess> tableConfigDescriptor) {
        this.tableConfigDescriptor = tableConfigDescriptor;
}

    @Override
    public void processElement(JSONObject jsonObj, ReadOnlyContext readOnlyContext, Collector<JSONObject> out) throws Exception {

    }

    @Override
    public void processBroadcastElement(String jsonStr, Context context, Collector<JSONObject> out) throws Exception {

    }
}

(2) 自定义函数MyBroadcastFunction-open

// 定义Phoenix的连接
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("parameters....: " + parameters);
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
        System.out.println("connection....: " + connection);
    }

(3) 自定义函数MyBroadcastFunction-processBroadcastElement

    /**
     * >>>> {"before":null, "after":{"source_table":"aa","sink_table":"aaa","sink_columns":"aa","sink_pk":"aa","sink_extend":"aa"},
     * "source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1670134486528,"snapshot":"false",
     * "db":"gmall-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},
     * "op":"r","ts_ms":1670134486531,"transaction":null}
     * 广播流数据、配置流
     * <p>
     * todo 1、获取并解析数据,方便主流操作
     * todo 2、校验表是否存在,如果不存在则需要在Phoenix中建表 checkTable
     * todo 3、写入状态,广播出去 (处理广播流数据)
     *
     * @param value
     * @param ctx
     * @param collector
     * @throws Exception
     */
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> collector) throws Exception {

        //TODO 1、获取并解析数据
        JSONObject jsonObject = JSON.parseObject(value);
        TableProcess tableProcess = JSON.parseObject(jsonObject.getString("after"), TableProcess.class);

        //TODO 2、校验并建表
        checkTable(tableProcess.getSinkTable(),
                tableProcess.getSinkColumns(),
                tableProcess.getSinkPk(),
                tableProcess.getSinkExtend());

        //TODO 3、写入状态,广播出去--(getBroadcastState获取广播状态)
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        broadcastState.put(tableProcess.getSourceTable(), tableProcess);

    }

(4) 自定义函数MyBroadcastFunction-checkTable

在 Phoenix 建表之前要先创建命名空间 GMALL_REALTIM。
0: jdbc:phoenix:> create schema GMALL2022_REALTIME;
checkTable() 方法如下
  /**
     * 校验并建表
     *
     * @param sinkTable   Phoenix表名
     * @param sinkColumns Phoenix表字段
     * @param sinkPk      Phoenix表主键
     * @param sinkExtend  Phoenix扩展字段
     */
    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {

        PreparedStatement preparedStatement = null;

        try {
            //拼接SQL
            //处理特殊字段
            if (sinkPk == null || "".equals(sinkPk)) {
                sinkPk = "id";
            }

            if (sinkExtend == null || "".equals(sinkExtend)) {
                sinkExtend = "";
            }

            //拼接SQL: create table if not exists db.tn(id varchar primary key,bb varchar,cc varchar ) xxx
            StringBuilder createTableSql = new StringBuilder("create table if not exists ")
                    .append(GmallConfig.HBASE_SCHEMA)
                    .append(".")
                    .append(sinkTable)
                    .append("(");

            String[] columns = sinkColumns.split(",");
            for (int i = 0; i < columns.length; i++) {
                //取出字段
                String column = columns[i];

                //判断是否为主键
                if (sinkPk.equals(column)) {
                    createTableSql.append(column).append(" varchar primary key");
                } else {
                    createTableSql.append(column).append(" varchar");
                }

                //判断是否为最后一个字段 ---只要不是最后一个都要拼接一个逗号
                if (i < columns.length - 1) {
                    createTableSql.append(",");
                }
            }

            createTableSql.append(")").append(sinkExtend);

            //编译SQL
            System.out.println("建表语句: " + createTableSql);
            preparedStatement = connection.prepareStatement(createTableSql.toString());

            //执行SQL,建表
            preparedStatement.execute();

        } catch (SQLException e) {
            //编译异常转换为运行时异常、主要是解决、出问题、程序必须停止
            throw new RuntimeException("建表失败" + sinkTable);
        } finally {
            //释放资源
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

    }

(5) 自定义函数MyBroadcastFunction-processElement()

 /**
     * 主流
     * {"database":"gmall-211126-flink","table":"base_trademark","type":"bootstrap-insert","ts":1669877319,
     * "data":{"id":1,"tm_name":"三星","logo_url":"/static/default.jpg"}}
     * <p>
     * todo 1、获取广播流的配置数据
     * todo 2、过滤字段 filterColumn
     * todo 3、补充SinkTable字段输出
     *
     * @param value
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //1、获取广播的配置数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

        String table = value.getString("table"); //只会获取维表数据
        TableProcess tableProcess = broadcastState.get(table);
        System.out.println(".....tableProcess"+tableProcess);
        System.out.println(".....table"+table);
        if (tableProcess != null) {
            //2、过滤字段
            filterColumn(value.getJSONObject("data"), tableProcess.getSinkColumns());

            //3、补充SinkTable并写出到流中
            value.put("sinkTable", tableProcess.getSinkTable());
            out.collect(value);
        } else {
            System.out.println("找不到对应的key: " + table);
        }


    }

(6) 自定义函数MyBroadcastFunction-filterColumns(),校验字段,过滤掉多余的字段

    /**
     * 过滤字段
     *
     * @param data        "data":{"id":1,"tm_name":"三星","logo_url":"/static/default.jpg"}
     * @param sinkColumns "id,tm_name"
     */
    private void filterColumn(JSONObject data, String sinkColumns) {

        //切分sinkColumns
        String[] columns = sinkColumns.split(",");
        List<String> columnList = Arrays.asList(columns);

//        Set<Map.Entry<String, Object>> entrySet = data.entrySet();
//        Iterator<Map.Entry<String, Object>> iterator = entrySet.iterator();
//        while (iterator.hasNext()) {
//            Map.Entry<String, Object> next = iterator.next();
//            if (!columnList.contains(next.getKey())) {
//                iterator.remove();
//            }
//
//        }

        Set<Map.Entry<String, Object>> entrySet = data.entrySet();
        entrySet.removeIf(next -> !columnList.contains(next.getKey()));
    }

(7) 主程序DimSinkApp中调用MyBroadcastFunction提取维度数据

       SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));

4) 保存维度到HBase(Phoenix)

(1) 程序流程分析

在这里插入图片描述

DimSink 继承了RickSinkFunction,这个function得分两条时间线:
一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行;
另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。

(2) 创建 PhoenixUtil 工具类,在其中创建insertValues()方法

package com.atguigu.gmall.realtime.util;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;

import java.sql.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;

public class PhoenixUtil {
    /**
     * Phoenix 表数据导入方法
     *
     * @param conn 连接对象
     * @param sinkTable 写入数据的 Phoenix 目标表名
     * @param data      待写入的数据
     */
    public static void insertValues(Connection conn, String sinkTable, JSONObject data) {
        // 获取字段名
        Set<String> columns = data.keySet();
        // 获取字段对应的值
        Collection<Object> values = data.values();
        // 拼接字段名
        String columnStr = StringUtils.join(columns, ",");
        // 拼接字段值
        String valueStr = StringUtils.join(values, "','");
        // 拼接插入语句
        String sql = "upsert into " + GmallConfig.HBASE_SCHEMA
                + "." + sinkTable + "(" +
                columnStr + ") values ('" + valueStr + "')";

        // 为数据库操作对象赋默认值
        PreparedStatement preparedSt = null;

        // 执行 SQL
        try {
            preparedSt = conn.prepareStatement(sql);
            preparedSt.execute();
            // 提交事务
            conn.commit();
        } catch (SQLException sqlException) {
            sqlException.printStackTrace();
            throw new RuntimeException("数据库操作对象获取或执行异常");
        } finally {
            if (preparedSt != null) {
                try {
                    preparedSt.close();
                } catch (SQLException sqlException) {
                    sqlException.printStackTrace();
                    throw new RuntimeException("数据库操作对象释放异常");
                }
            }
        }
}
}

(3) MyPhoenixSink

自定义 SinkFunction 子类 MyPhoenixSink,在其中调用 Phoenix 工具类的 insertValues(String sinkTable, JSONObject data) 方法,将维度数据写出到 Phoenix 的维度表中。为了提升效率,减少频繁创建销毁连接带来的性能损耗,创建连接池。

(1)添加德鲁伊连接池依赖

  <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.16</version>
        </dependency>

(2)连接池创建工具类

package com.atguigu.gmall.realtime.util;

import com.alibaba.druid.pool.DruidDataSource;

public class DruidDSUtil {
    private static DruidDataSource druidDataSource;

    public static DruidDataSource createDataSource() {
        // 创建连接池
        druidDataSource = new DruidDataSource();
        // 设置驱动全类名
        druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);
        // 设置连接 url
        druidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);
        // 设置初始化连接池时池中连接的数量
        druidDataSource.setInitialSize(5);
        // 设置同时活跃的最大连接数
        druidDataSource.setMaxActive(20);
        // 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
        druidDataSource.setMinIdle(1);
        // 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
        druidDataSource.setMaxWait(-1);
        // 验证连接是否可用使用的 SQL 语句
        druidDataSource.setValidationQuery("select 1");
        // 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
        // 注意,默认值为 true,如果没有设置 validationQuery,则报错
        // testWhileIdle is true, validationQuery not set
        druidDataSource.setTestWhileIdle(true);
        // 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
        druidDataSource.setTestOnBorrow(false);
        // 归还连接时,是否测试
        druidDataSource.setTestOnReturn(false);
        // 设置空闲连接回收器每隔 30s 运行一次
        druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
        // 设置池中连接空闲 30min 被回收,默认值即为 30 min
        druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);

        return druidDataSource;
    }
}

(3)MyPhoenixSink 函数

package org.example.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.example.utils.DruidDSUtil;
import org.example.utils.PhoenixUtil;

public class DimSinkFunction extends RichSinkFunction<JSONObject> {

    private DruidDataSource druidDataSource = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        druidDataSource = DruidDSUtil.createDataSource();

    }

    /**
     * 每来一次数据调用一次
     * {"database":"gmall-211126-flink","table":"base_trademark","type":"insert","ts":1669876072,"xid":1470,"commit":true,
     * "data":{"id":13,"tm_name":"atguigu","logo_url":"aaa/aaa"},"sinkTable":"dim_xxx"}
     * <p>
     * {"database":"gmall-211126-flink","table":"base_trademark","type":"update","ts":1669876141,"xid":1630,"commit":true,
     * "data":{"id":13,"tm_name":"atguigu","logo_url":"bbb/bbb"},"old":{"logo_url":"aaa/aaa"},"sinkTable":"dim_xxx"}
     * <p>
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {

        //获取链接
        DruidPooledConnection connection = druidDataSource.getConnection();

        //写出数据
        String sinkTable = value.getString("sinkTable");
        JSONObject data = value.getJSONObject("data");
        PhoenixUtil.upSerValues(connection, sinkTable, data);

        //归还数据 --连接池 -不是真的把链接释放掉了
        connection.close();

    }
}

(4) 主程序 DimSinkApp 中调用 MyPhoenixSink

// TODO 9. 将数据写入 Phoenix 表
        dimDS.addSink(new MyPhoenixSink());

(5) 测试

(1)启动HDFS、ZK、Kafka、Maxwell、HBase
(2)运行 IDEA 中的 DimSinkApp
(3)执行 mysql_to_kafka_init.sh 脚本
mysql_to_kafka_init.sh all
(4)通过phoenix查看hbase的schema以及表情况

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/107144.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(野火征途 Altera EP4CE10)硬件说明

开发板买了好久了&#xff0c;但是一直都没有去学习。本着不浪费的想法&#xff0c;且通过记笔记来监督自己. FPGA FPGA是一种可以重构电路的芯片&#xff0c;是一种硬件可重构的体系结构。通过编程&#xff0c;用户可以随时改变它的应用场景&#xff0c;它可以模拟CPU、GPU等…

第13章 事务

第13章 事务 考试范围&#xff1a; 13.1-13.10 考试题型&#xff1a; 事务操作 考试内容&#xff1a; 1、事务的概念与特性(ACID) 概念 A transaction is a unitof program execution that accesses and possibly updates various data items事务是程序执行的单元&#xff…

云原生|kubernetes|CKA真题解析-------(11-17题)

第十一题&#xff1a; 创建多容器的pod 题目要求&#xff1a;解析&#xff1a; 多容器pod的创建&#xff0c;先创建一个单容器的pod&#xff0c;然后在此基础上修改即可 解答&#xff1a; 先创建单容器的pod kubectl run kucc1 --imagenginx --dry-runclient -oyaml >11…

css实现环形进度条

效果&#xff1a; 纯css实现进度条&#xff0c;这里用到的核心属性为box-show&#xff0c;box-show可以控制元素的阴影&#xff0c;通过控制元素阴影的移动位置来实现进度条效果。 .box{box-show : 0px 0px 0px 0px #ccc; }box-show有5个参数 第一个参数&#xff1a; 控制元…

第4章 中级SQL

第4章 中级SQL 考试范围&#xff1a; 4.1-4.7 考试题型&#xff1a; 计算题 考试内容&#xff1a; 连接类型&#xff08;与第3章合并考察&#xff09; 视图的定义与使用 事务&#xff08;与17-19章合并考察&#xff09; 完整性的概念 SQL中如何定义、修改各类完整性(Pr…

JVS低代码多账号统一登录介绍

登录操作演示 统一登录能力 JVS整个系统认证采用Oauth2 认证方案&#xff0c;目前支持目前登陆方式如下&#xff1a; 登录方式 说明 账号密码登录 基于JVS的用户名用户密码登录 手机动态验证码登录 基于JVS用户绑定的手机号动态验证码登录 微信扫码关注公众号登录 基于…

web前端-javascript-Math对象(说明和方法,它封装了数学运算相关的属性和方法)

文章目录Math 对象1. 说明2. 方法1) abs()2) Math.ceil()3) Math.floor()4) Math.round()5) Math.random()6) max 和 min7) Math.pow(x,y)8) Math.sqrt()Math 对象 1. 说明 Math 和其他的对象不同&#xff0c;它不是一个构造函数它属于一个工具类不用创建对象&#xff0c;它里…

仿真设计|基于51单片机的简易抢答器

目录 前言 具体实现功能 设计介绍 51单片机简介 设计方案 资料内容 仿真实现&#xff08;protues8.7&#xff09; 程序&#xff08;Keil5&#xff09; 全部资料&#xff08;压缩文件&#xff09; 前言 全部资料包括程序(Keil5)、protues仿真(protues8.7)、仿真视频、…

教育领域知识图谱

教育领域开源的知识图谱实体 在教育领域,有许多开源的知识图谱实体可供使用。下面列出了一些例子: DBpedia:这是一个知识图谱,由 Wikipedia 的内容构建而成。DBpedia 中包含了许多关于人、地方、事物和概念的实体,并且这些实体都具有相关的属性和关系。 Wikidata:这是一个…

LeetCode算法之--二叉树系列

点赞收藏&#xff0c;以防遗忘 本文【程序大视界】已收录&#xff0c;关注免费领取互联网大厂学习资料&#xff0c;添加博主好友进群学习交流&#xff0c;欢迎留言和评论&#xff0c;一起交流共同进步。 【一】前言 二叉树也是面试算法的常见题型&#xff0c;通常程序会自定义…

Go秒杀系统——RabbitMQ核心概念与工作模式

前言&#x1f4ac; Windows 上的 RabbitMQ 被我卸载了&#xff0c;在 macOS 上再安装一下&#xff0c;采用 brew install 还是挺方便的。 很好奇微软的程序员写代码用的是 Windows 操作系统吗&#xff1f;感觉有点不方便&#xff0c;但用 macOS 岂不是太丢撵了。 一、macOS 安装…

APS排程软件提升电子产品生产企业的服务效益

"3C产品"&#xff0c;就是计算机、通信和消费类电子产品三者结合&#xff0c;也称"信息家电"。由于3C产品的体积一般都不大&#xff0c;所以往往在中间加一个"小"字&#xff0c;故往往统称为"3C小家电"。 据报道&#xff0c;某一科技公…

使用DoraCloud免费版搭建办公桌面云

DoraCloud是一款多平台的桌面虚拟化管理软件&#xff0c;支持Hyper-V、VMware、Proxmox、XenServer等多种虚拟化平台。DoraCloud在虚拟化平台上具有极大的灵活性&#xff0c;允许您的组织自由选择合适的IT基础设施来构建桌面云&#xff1b;也允许您的组织重用现有的IT设施基础&…

B树和B+树的详解讲解

1.B树 前面我们已经学习了二叉查找树、2-3树以及它的实现红黑树。2-3树中&#xff0c;一个结点做多能有两个key&#xff0c;它的实现红黑树中使用对链接染色的方式去表达这两个key。接下来我们学习另外一种树型结构B树&#xff0c;这种数据结构中&#xff0c;一个结点允许多于…

Java LinkedList

链表&#xff08;Linked list&#xff09;是一种常见的基础数据结构&#xff0c;是一种线性表&#xff0c;但是并不会按线性的顺序存储数据&#xff0c;而是在每一个节点里存到下一个节点的地址。 链表可分为单向链表和双向链表。 一个单向链表包含两个值: 当前节点的值和一个…

linux软件安装

软件安装1.安装方式2.安装jdk3.安装Tomcat4.安装mysql5.安装lrzsz1.安装方式 2.安装jdk &#xff08;1&#xff09;使用 Xftp 将jdk的二进制包上传到 Linux。 关于 Xftp 的下载和安装看这里&#xff1a; https://blog.csdn.net/weixin_56680764/article/details/126335138 本文…

C++:using : using的四大用法总结

1&#xff1a;using声明&#xff08;引入单个名称&#xff09; using声明是将命名空间中某个名字单独引入到当前作用域&#xff0c;这使得我们在当前作用域下可以直接使用该名字而无需使用作用域限定符 :: 。 #include <string> using std::string; int main() {string…

TKDE2022 | 基于关系的协同过滤算法,利用注意力机制来学习物品关系的嵌入特征...

嘿&#xff0c;记得给“机器学习与推荐算法”添加星标鉴于经典的协同过滤算法的有效性和易用性&#xff0c;基于物品的协同过滤方法已被广泛应用于工业领域&#xff0c;并在近年来被广泛研究。基于物品的协同过滤方法的关键在于物品之间的相似度测量&#xff0c;但本文认为这是…

推荐系统学习笔记-推荐系统数据流

一般数据流 数据流的定义 数据流&#xff08;data stream&#xff09;是一组有序&#xff0c;有起点和终点的字节的数据序列。包括输入流和输出流。数据流最初是通信领域使用的概念&#xff0c;代表传输中所使用的信息的数字编码信号序列。这个概念最初在1998年由Henzinger提…

《软件开发本质论》笔记——如何衡量价值

目录 一、使用数值来表示价值的做法 二、大多数与金钱有关的衡量指标的局限性 三、有没有一个简单的衡量方式 四、价值评估更好的做法 个人理解的价值 一、使用数值来表示价值的做法 这种方式可能让我们滑入深渊。 比如 如果公司开发产品的目的是赚钱&#xff0c;那么就可…