[电商实时数仓] 用户行为数据和业务数据采集以及ODS层

news2024/10/5 16:32:12

文章目录

      • 1.数据仓库环境准备
        • 1.1 导入依赖
        • 1.2 创建相关包
      • 2.数据仓库运行环境
        • 2.1 Hbase环境
        • 2.2 模拟数据
      • 3.数仓开发之ODS层
        • 3.1 mysql配置修改
        • 3.2 FlinkCDC的程序
        • 3.3 结果检测

1.数据仓库环境准备

1.1 导入依赖

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--如果保存检查点到hdfs上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

1.2 创建相关包

在这里插入图片描述

log.properties

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n

log4j.rootLogger=error,stdout

2.数据仓库运行环境

需要搭建Flink, HBase, Mysql, Redis, ClickHouse 环境

2.1 Hbase环境

pom

<dependency>
<groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-spark</artifactId>
    <version>5.0.0-HBase-2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.el</artifactId>
        </exclusion>
    </exclusions>
</dependency>

hbase-site.xml

<configuration>
<property>
    <name>hbase.rootdir</name>
    <value>hdfs://hadoop102:8020/HBase</value>
</property>

<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
</property>

<property>
    <name>hbase.zookeeper.quorum</name>
    <value>hadoop102,hadoop103,hadoop104</value>
</property>

<property>
    <name>hbase.unsafe.stream.capability.enforce</name>
    <value>false</value>
</property>

<property>
    <name>hbase.wal.provider</name>
    <value>filesystem</value>
</property>

<!-- 注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加上以上两个配置,并使用xsync进行同步(本节1中文档已有说明)。-->
<property>
    <name>phoenix.schema.isNamespaceMappingEnabled</name>
    <value>true</value>
</property>

<property>
    <name>phoenix.schema.mapSystemTablesToNamespace</name>
    <value>true</value>
</property>
</configuration>

2.2 模拟数据

通常企业在开始搭建数仓时,业务系统中会存在历史数据,一般是业务数据库存在历史数据,而用户行为日志无历史数据。假定数仓上线的日期为2023-01-21,为模拟真实场景,需准备以下数据。

1)用户行为日志
用户行为日志,一般是没有历史数据的,故日志只需要准备2023-01-21一天的数据。具体操作如下:
(1)启动 Kafka。
(2)启动一个命令行 Kafka 消费者,消费 topic_log 主题的数据。
(3)修改两个日志服务器(hadoop102、hadoop103)中的
/opt/module/applog/application.yml配置文件,将mock.date参数改为2023-01-21。
(4)执行日志生成脚本lg.sh。
(5)观察命令行 Kafka 消费者是否消费到数据。

在这里插入图片描述

2)业务数据
实时计算不考虑历史的事实数据,但要考虑历史维度数据。因此要对维度相关的业务表做一次全量同步。
(1)维度数据首日全量同步
①修改Maxwell配置文件中的mock_date参数

[atguigu@hadoop102 maxwell]$ vim /opt/module/maxwell/config.properties

mock_date=2023-01-21

②启动业务数据采集通道,包括Maxwell、Kafka
③编写业务数据首日全量脚本
与维度相关的业务表如下

activity_info
activity_rule
activity_sku
base_category1
base_category2
base_category3
base_province
base_region
base_trademark
coupon_info
coupon_range
financial_sku_cost
sku_info
spu_info
user_info

切换到 /home/atguigu/bin 目录,创建 mysql_to_kafka.sh 文件

[atguigu@hadoop102 maxwell]$ cd ~/bin
[atguigu@hadoop102 bin]$ vim mysql_to_kafka_init.sh
#!/bin/bash

# 该脚本的作用是初始化所有的业务数据,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"activity_info")
  import_data activity_info 
  ;;
"activity_rule")
  import_data activity_rule 
  ;;
"activity_sku")
  import_data activity_sku 
  ;;
"base_category1")
  import_data base_category1 
  ;;
"base_category2")
  import_data base_category2 
  ;;
"base_category3")
  import_data base_category3 
  ;;
"base_province")
  import_data base_province 
  ;;
"base_region")
  import_data base_region 
  ;;
"base_trademark")
  import_data base_trademark 
  ;;
"coupon_info")
  import_data coupon_info 
  ;;
"coupon_range")
  import_data coupon_range 
  ;;
"financial_sku_cost")
  import_data financial_sku_cost 
  ;;
"sku_info")
  import_data sku_info 
  ;;
"spu_info")
  import_data spu_info 
  ;;
"user_info")
  import_data user_info 
  ;;
"all")
  import_data activity_info 
  import_data activity_rule 
  import_data activity_sku 
  import_data base_category1 
  import_data base_category2 
  import_data base_category3 
  import_data base_province 
  import_data base_region 
  import_data base_trademark 
  import_data coupon_info 
  import_data coupon_range 
  import_data financial_sku_cost 
  import_data sku_info 
  import_data spu_info 
  import_data user_info 
  ;;
esac

增加执行权限

[atguigu@hadoop102 bin]$ chmod +x mysql_to_kafka_init.sh

④启动 Kafka 消费者,观察数据是否写入 Kafka

[atguigu@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db

(2)业务数据生成
业务数据生成之前要将 application. properties 文件中的 mock.date 参数修改为业务时间,首日应设置为 2023-01-21。

同时要保证 Maxwell 配置文件 config.properties 中的 mock.date 参数和 application. properties 中 mock.date 参数的值保持一致。

3.数仓开发之ODS层

采集到 Kafka 的 topic_log 和 ods_base_db主题的数据即为实时数仓的 ODS 层,这一层的作用是对数据做原样展示和备份。

采用FlinkCDC去采集ODS数据

3.1 mysql配置修改

修改mysql配置

vim /etc/my.cnf

在这里插入图片描述

加上binlog-do-db=gmall-210325-flink, 表示监视gmall-210325-flink库的全部表

3.2 FlinkCDC的程序

public class Flink_CDCWithCustomerSchema {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .username("root")
                .password("root")
                .databaseList("gmall-210325-flink")
                .tableList("")
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();

        DataStreamSource<String> streamSource = environment.addSource(sourceFunction);

        streamSource.addSink(MyKafkaUtil.getKafkaProducer("ods_base_db"));

        environment.execute();
    }
}

自定义序列化格式

public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
    // 格式 Json
    // database, tableName, type, before, after
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        JSONObject result = new JSONObject();

        String topic = sourceRecord.topic();
        String[] split = topic.split(".");
        String database = split[1];
        String tableName = split[2];

        Struct value = (Struct) sourceRecord.value();
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null){
            Schema schema = value.schema();
            List<Field> fields = schema.fields();
            for (int i = 0; i < fields.size(); i++) {
                Field field = fields.get(i);
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null){
            Schema schema = value.schema();
            List<Field> fields = schema.fields();
            for (int i = 0; i < fields.size(); i++) {
                Field field = fields.get(i);
                Object afterValue = before.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        // 操作字段
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)){
            type = "insert";
        }
        System.out.println(operation);

        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);



        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

Kafka工具类

public class MyKafkaUtil {
    public static String KAFKA_SERVER =
            "hadoop102:9092,hadoop103:9092,hadoop104:9092";
    public static String default_topic = "DWD_DEFAULT_TOPIC";

    private static Properties properties = new Properties();

    static {
        properties.setProperty("bootstrap.servers", KAFKA_SERVER);
    }
    public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
        return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
    }
}

开启Kafka和Zookeeper

在这里插入图片描述

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap.server hadoop102:9092 --topic ods_base_db

3.3 结果检测

在这里插入图片描述

在表中z_user_info中加入 [12, atguigu] 这条数据。
因为在CDC中设计的是监视的为database中所有的表

java程序:

在这里插入图片描述

Kafka程序:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/180023.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为什么你的Facebook广告策略应该包括SEO

最近在看了很多关于 SEO的文章&#xff0c;今天想跟大家分享一些我个人关于 Facebook广告中的 SEO策略&#xff0c;以及它为什么是必要的。虽然在我看来&#xff0c;所有营销手段都需要结合 SEO才能发挥最大作用&#xff0c;但这并不意味着要完全放弃 SEO。如果你对以下问题感兴…

分享147个ASP源码,总有一款适合您

ASP源码 分享147个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 147个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/1us1KTsxeaZlosHsqvrkC5Q?pwd81pl 提取码&#x…

Leetcode:51. N 皇后(C++)

目录 问题描述&#xff1a; 实现代码与解析&#xff1a; 回溯&#xff1a; 原理思路&#xff1a; 问题描述&#xff1a; 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&a…

数字电位器程控可调电阻ic

一、前言 数字电位器又叫可编程电阻器&#xff0c;是一种替代传统机械电位器的新型CMOS数字、模拟混合信号处理集成电路&#xff0c;不需要搭建复杂的电路环境即可简单的通过CPU数字通讯实现电路调节&#xff0c;数字电位器也不能完全替代传统的机械电位器&#xff0c;在很多场…

Sentinel(限流、熔断、降级)、SpringBoot整合Sentinel、Sentinel的使用-60

一&#xff1a;Sentinel简介 Sentinel就是分布式系统的流量防卫兵 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点&#xff0c;从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。 1.1 官方文档 官方文档&#…

基于OpenXR,Collabora推开源VI-SLAM AR/VR定位系统

XR最关键的难题之一就是定位&#xff0c;为了定位XR头显在现实世界中的位置和角度&#xff0c;厂商们采用了多种方案&#xff0c;比如机械传感器、惯性传感器、磁传感器、声学传感器等等。这些定位方式有一个共同的问题&#xff0c;那就是传感器不够完善&#xff0c;且会产生噪…

uniapp的父传子,子传父,子组件与父组件数据同步(.sync)的理解

父传子&#xff1a; 父调用 绑定的子组件中state然后 mystate1赋值false 给子组件中的state。并在子组件中显示父中传来的值。 注意要在子组件中设置 props【属性】不然父中的值无法传过去。 <view >开启{{mystate1}}</view> --调用子组件mypop&#xff0c;并传值…

学习记录673@项目管理之进度管理案例

本文主要是进度管理之关键链路法的案例。 案例 Perfect 项目的建设方要求必须按合同规定的期限交付系统&#xff0c;承建方项目经理李某决定严格执行项目进度管理&#xff0c;以保证项目按期完成。他决定使用关键路径法来编制项目进度网络图。在对工作分解结构进行认真分析后&…

05 二叉树前序/中序/后序线索化和找前驱、后继

1. 线索化代码 线索化需要先序/中序/后续遍历的过程&#xff0c;多了访问到节点时指针指向的问题 二叉树形状和运行结果 主函数 #include "func.h"// 二叉树线索化(便于找前驱和后继节点) // 1. 二叉树先序线索化 // 2. 二叉树中序线索化 // 3. 二叉树后序线索化//…

《MySQL》MySQL简单操作

最近开始了新的学习进度 进入MySQL数据库的学习 目录 一、MySQL启动方法 1.使用MySQL启动 2.使用cmd启动 二、数据库的简单操作命令 显示当前服务器上有哪些数据库 创建新的数据库 删除数据库 选中数据库 三、数据表的操作 数据类型 四、表的简单操作 查看数据库中的…

Java多线程-线程的生命周期

Java多线程-线程的生命周期 线程的状态 New 表示线程已创建&#xff0c;没启动的状态此时已经做了一些准备工作&#xff0c;还没有执行run方法中代码 Runnable 调用start方法之后的状态&#xff0c;表示可运行状态(不一定正在运行&#xff0c;因为调用start方法之后不一定立…

分享148个ASP源码,总有一款适合您

ASP源码 分享148个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 148个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/1e2PvBmXxZA8C3IelkP8ZtQ?pwdj7lp 提取码&#x…

node.js 安装步骤

1、下载安装包 Node.js 官方网站下载&#xff1a;Node.js 选择操作系统对应的包&#xff1a; 下载完成&#xff0c;安装包如下&#xff1a; 2、安装Node 打开安装&#xff0c;傻瓜式下一步即可&#xff1a; 选择安装位置&#xff0c;我这里装在D盘下&#xff1a; 安装成功&…

图和树基础算法笔记

图的大部分知识在《离散数学》中都已经学习了&#xff0c;所以我主要放一些不知道的知识 常用概念 有很少边或弧&#xff08;如 e < n log n&#xff0c;e指边数&#xff0c;n指顶点数&#xff09;的图称为稀疏图&#xff0c;反之称为稠密图。完全图&#xff1a;每个顶点的…

[引擎开发] 现代图形API - dx12篇

本文将从性能优化的角度去阐述像dx12这样的现代图形API的一些设计理念。 当我们深入优化渲染管线的时候&#xff0c;我们会发现存在的几个瓶颈主要是这样的&#xff1a; ① 线程存在不合理的等待 ② CPU向GPU编码传输数据非常耗时 ③ CPU频繁地切换渲染上下文非常耗时 因此有时…

Python---库的使用

专栏&#xff1a;python 个人主页&#xff1a;HaiFan. 专栏简介&#xff1a;本专栏主要更新一些python的基础知识&#xff0c;也会实现一些小游戏和通讯录&#xff0c;学时管理系统之类的&#xff0c;有兴趣的朋友可以关注一下。 库前言标准库使用import导入模块例1例2例3 文件…

老司机经验分享:生产级中间件系统架构设计实践

目录 1、Master-Slave架构2、异步日志持久化机制3、检查点机制&#xff1a;定时持久化全量数据4、引入检查点节点5、总结 & 思考 这篇文章&#xff0c;给大家来聊一个生产级的中间件系统的架构设计实践&#xff0c;希望给对中间件系统感兴趣的同学一点启发。 1、Master-S…

【Java|golang】1663. 具有给定数值的最小字符串---int32切片类型转化string

小写字符 的 数值 是它在字母表中的位置&#xff08;从 1 开始&#xff09;&#xff0c;因此 a 的数值为 1 &#xff0c;b 的数值为 2 &#xff0c;c 的数值为 3 &#xff0c;以此类推。 字符串由若干小写字符组成&#xff0c;字符串的数值 为各字符的数值之和。例如&#xff…

修改VS2015的文件编码格式为utf8,解决在Ubuntu下中文输出为乱码的问题

开发环境&#xff1a; Windows系统&#xff1a;Windows 10 家庭版&#xff0c;VS2015社区版 Linux系统&#xff1a;Ubuntu 22.04 LTS Server版&#xff0c;gcc version 11.3.0 (Ubuntu 11.3.0-1ubuntu1~22.04) 今天编写跨平台的代码&#xff0c;在Windows下用VS2015编写&a…

C++ AVL树

前言 众所周知红黑树是由AVL树改进得来的&#xff0c;想要深入学习哈希表的底层存储那么AVL的学习就相当有必要了。 本来想将AVL的插入删除都能实现&#xff0c;但是在写删除功能时碰到了难题和Bug&#xff0c;所以暂时先给出插入的实现过程&#xff0c;和删除功能的实现思路 …