python毕设选题 - flink大数据淘宝用户行为数据实时分析与可视化

news2024/10/5 15:25:13

文章目录

  • 0 前言
  • 1、环境准备
    • 1.1 flink 下载相关 jar 包
    • 1.2 生成 kafka 数据
    • 1.3 开发前的三个小 tip
  • 2、flink-sql 客户端编写运行 sql
    • 2.1 创建 kafka 数据源表
    • 2.2 指标统计:每小时成交量
      • 2.2.1 创建 es 结果表, 存放每小时的成交量
      • 2.2.2 执行 sql ,统计每小时的成交量
    • 2.3 指标统计:每10分钟累计独立用户数
      • 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
      • 2.3.2 创建视图
      • 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
    • 2.4 指标统计:商品类目销量排行
      • 2.4.1 创建商品类目维表
      • 2.4.1 创建 es 结果表,存放商品类目排行表
      • 2.4.2 创建视图
      • 2.4.3 执行 sql , 统计商品类目销量排行
  • 3、最终效果与体验心得
    • 3.1 最终效果
    • 3.2 体验心得
      • 3.2.1 执行
      • 3.2.2 存储
  • 4 最后


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH (
    'connector.type' = 'kafka', 
    'connector.version' = 'universal',  
    'connector.topic' = 'user_behavior',  
    'connector.startup-mode' = 'earliest-offset', 
    'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 
    'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 
    'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'buy_cnt_per_hour',
    'connector.document-type' = 'user_behavior',
    'connector.bulk-flush.max-actions' = '1',
    'update-mode' = 'append',
    'format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',    
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (
    sub_category_id BIGINT,
    parent_category_name STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (
    category_name  STRING,
    buy_cnt  BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1357682.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IOC解决程序耦合

1.什么是IOC IOC (Inverse of Control)即控制反转:由ioc容器来创建依赖对象,程序只需要从IOC容器获取创建好的对象。 我们在获取对象时,都是采用new的方式。是主动的。 我们获取对象时,同时跟工厂要,有工厂为我们查找…

大数据Doris(五十):数据导出的其他导出案例参考

文章目录 数据导出的其他导出案例参考 一、​​​​​

乘号在键盘上怎么打?速来学习这4个方法!

“我在用电脑编辑文档时,需要输入一些数学公式,现在我不知道乘号怎么打,有朋友可以教教我吗?” 乘号是数学中常用的符号,用于表示两个数的相乘关系。很多用户在使用电脑办公时可能需要输入乘号。但是却发现键盘上没有明…

sql如何获取字段是数组中的数字【搬代码】

我们可以看到表中字段是一个数组怎么获取其中的数据呢? SELECT sim->>$[0] FROM fin_xxx如果使用左外链接,如下,其他连接时一样的 SELECT a.* FROM fin_aaaa a LEFT JOIN fin_xxx b ON b.sim_r->>$[0]a.corr WHERE b.tid20210 …

【React系列】Redux(一)管理状态

本文来自#React系列教程:https://mp.weixin.qq.com/mp/appmsgalbum?__bizMzg5MDAzNzkwNA&actiongetalbum&album_id1566025152667107329) 在React的开发过程中,Redux对于我们是非常重要的。 但是对于很多人来说,初次接触redux会感觉r…

Leetcode_day01_88合并两个有序数组

Leetcode_day01_88合并两个有序数组 题目描述: 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2,另有两个整数 m 和 n ,分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中,使合并后的数组同样按 非递减顺…

锂电池寿命预测 | Matlab基于LSTM长短期记忆神经网络的锂电池寿命预测

目录 预测效果基本介绍程序设计参考资料 预测效果 基本介绍 锂电池寿命预测 | Matlab基于LSTM长短期记忆神经网络的锂电池寿命预测 程序设计 完整程序和数据获取方式:私信博主回复Matlab基于LSTM长短期记忆神经网络的锂电池寿命预测。 参考资料 [1] http://t.csdn…

【已解决】若依系统前端打包后,部署在nginx上,点击菜单错误:@/views/system/role/index

​ 上面错误,是因为/views/system/role/index动态路由按需加载时候,错误导致。 解决办法: 如果您的前端项目访问时候,需要带有项目名称的话,参考凯哥上一篇文章:【已解决】若依前后端分离版本&#xff0…

《MySQL系列-InnoDB引擎05》MySQL索引与算法

文章目录 第五章 索引与算法1 InnoDB存储引擎索引概述2 数据结构与算法2.1 二分查找法2.2 二分查找树和平衡二叉树 3 B树3.1 B树的插入操作3.2 B树的删除操作 4 B树索引4.1 聚集索引4.2 辅助索引4.3 B树索引的分裂 5 Cardinality值5.1 什么是Cardinality5.2 InnoDB存储引擎的Ca…

树与二叉树笔记整理

摘自小红书 ## 树与二叉树 ## 排序总结

Zookeeper(持续更新)

VIP-01 Zookeeper特性与节点数据类型详解 文章目录 VIP-01 Zookeeper特性与节点数据类型详解正文1. 什么是Zookeeper?2. Zookeeper 核心概念2.1、 文件系统数据结构2.2、监听通知机制2.3、Zookeeper 经典的应用场景3.2. 使用命令行操作zookeeper 正文 什么是Zookee…

VMware Workstation虚拟机CentOS 7.9 配置固定ip的步骤

VMware Workstation虚拟机CentOS7.9配置固定ip的步骤 编辑虚拟机 打开VMware Workstation。 选择要配置的虚拟机,但不要启动它。 点击“编辑虚拟机设置”(Edit virtual machine settings)。 选择“网络适配器”(Network Adapter&…

JAVA基础学习笔记-day13-数据结构与集合源1

JAVA基础学习笔记-day13-数据结构与集合源1 1. 数据结构剖析1.1 研究对象一:数据间逻辑关系1.2 研究对象二:数据的存储结构(或物理结构)1.3 研究对象三:运算结构1.4 小结 2. 一维数组2.1 数组的特点 3. 链表3.1 链表的…

c++牛客总结

一、c/c语言基础 1、基础 1、指针和引用的区别 指针是一个新的变量,指向另一个变量的地址,我们可以通过这个地址来修改该另一个变量; 引用是一个别名,对引用的操作就是对变量本身进行操作;指针可以有多级 引用只有一…

【React系列】Redux(二)中间件

本文来自#React系列教程:https://mp.weixin.qq.com/mp/appmsgalbum?__bizMzg5MDAzNzkwNA&actiongetalbum&album_id1566025152667107329) 一. 中间件的使用 1.1. 组件中异步请求 在之前简单的案例中,redux中保存的counter是一个本地定义的数据…

docker (portainer 安装nginx)

汉化版步骤可以参考:写文章-CSDN创作中心https://mp.csdn.net/mp_blog/creation/editor/135258056 一、创建容器 二、配置端口,以及容器卷挂载 挂载目录配置:(下方截图的目录如下,docker 改为 mydocker,用docker作为根…

pytest安装失败,报错Could not find a version that satisfies the requirement pytest

问题 安装pytest失败,尝试使用的命令有 pip install pytest pip3 install pytest pip install -U pytest pip install pytest -i https://pypi.tuna.tsinghua.edu.cn/simple但是都会报同样的错: 解决方案 发现可能是挂了梯子的原因,关掉…

Git保姆级安装教程

Git保姆级安装教程 一、去哪下载二、安装2.1 具体安装步骤2.2 设置全局用户签名 一、去哪下载 1、官网(有最新版本):https://git-for-windows.github.io/ 2、本人学习时安装的版本,链接:https://pan.baidu.com/s/1uAo…

【完整流程】实现STM32+ESP8266+MQTT+阿里云+APP——【第二节-编写STM32程序初步实现ESP8266上云发布订阅消息】

🌟博主领域:嵌入式领域&人工智能&软件开发 前言:本节实现,硬件连接STM32与ESP8266,编写STM32程序通过at命令方式实现STM32ESP8266与阿里云物联网平台发布订阅消息,本节最终实现初步的发布订阅消息…

asp.net core使用gb2312编码

nuget包 安装System.Text.Encoding.CodePages 使用 //将byte[]转化为gb2312的字符串,要确保byte[]是存储的gb2312的字符串,要不然会乱码 string ToGb213(byte[] str) {//首先需要注册Encoding.RegisterProvider(CodePagesEncodingProvider.Instance)…