计算机毕设 flink大数据淘宝用户行为数据实时分析与可视化

news2025/1/23 11:55:09

文章目录

  • 0 前言
  • 1、环境准备
    • 1.1 flink 下载相关 jar 包
    • 1.2 生成 kafka 数据
    • 1.3 开发前的三个小 tip
  • 2、flink-sql 客户端编写运行 sql
    • 2.1 创建 kafka 数据源表
    • 2.2 指标统计:每小时成交量
      • 2.2.1 创建 es 结果表, 存放每小时的成交量
      • 2.2.2 执行 sql ,统计每小时的成交量
    • 2.3 指标统计:每10分钟累计独立用户数
      • 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
      • 2.3.2 创建视图
      • 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
    • 2.4 指标统计:商品类目销量排行
      • 2.4.1 创建商品类目维表
      • 2.4.1 创建 es 结果表,存放商品类目排行表
      • 2.4.2 创建视图
      • 2.4.3 执行 sql , 统计商品类目销量排行
  • 3、最终效果与体验心得
    • 3.1 最终效果
    • 3.2 体验心得
      • 3.2.1 执行
      • 3.2.2 存储
  • 4 最后


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH (
    'connector.type' = 'kafka', 
    'connector.version' = 'universal',  
    'connector.topic' = 'user_behavior',  
    'connector.startup-mode' = 'earliest-offset', 
    'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 
    'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 
    'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'buy_cnt_per_hour',
    'connector.document-type' = 'user_behavior',
    'connector.bulk-flush.max-actions' = '1',
    'update-mode' = 'append',
    'format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',    
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (
    sub_category_id BIGINT,
    parent_category_name STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (
    category_name  STRING,
    buy_cnt  BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1019416.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

双喜临门 | 开源网安SFuzz在谷歌FuzzBench中夺魁,并获得CWE国际认证

​近日,开源网安模糊测试平台SFuzz于上半年推出的“灰盒代码模糊测试系统”,经过数月的不断调优,在谷歌的FuzzBench基准测试中脱颖而出,荣登榜首。此外,SFuzz还成功获得了CWE国际兼容性认证,进一步巩固了其…

警报:新的 Kubernetes 漏洞可对 Windows 端点实施远程攻击

不久前,研究人员在 Kubernetes 中发现的三个可被利用并相互关联的高危安全漏洞,这些漏洞可在集群内的 Windows 端点上以提升权限的方式实现远程代码执行。 这些漏洞被标记为 CVE-2023-3676、CVE-2023-3893 和 CVE-2023-3955,CVSS 评分为 8.8…

第73步 时间序列建模实战:多步滚动预测 vol-1(以决策树回归为例)

基于WIN10的64位系统演示 一、写在前面 上一期,我们讲了单步滚动预测,一次只预测一个值。 既然有单步,有没有多步呢?那肯定有,这一期来介绍多步滚动预测。 然而,多步滚动模型也可以有不同的步骤&#x…

Mac电脑运行太卡怎么办?

许多小伙伴使用Mac后都反馈电脑不如想象中的流畅,甚至有点卡顿的现象,原因可能是因为无用的应用占据了过多的内存,或者是系统盘垃圾过多,导致的电脑卡顿现象。 今天小编教给大家几招,让自己的Mac能够一键重生&#xf…

创造性地解决冲突

1、冲突的根本原因是矛盾双方存在不可调和的目标冲突。 2、要知己知彼: 知己:就是对自己的问题、需求进行客观定义,说明需求和问题的意义或价值、阐述解决方案和期望效果; 知彼:站在对方立场,深挖对方真…

探索AIGC人工智能(Midjourney篇)(四)

文章目录 Midjourney模特换装 Midjourney制作APP图标 Midjourney网页设计 Midjourney如何生成IP盲盒 Midjourney设计儿童节海报 Midjourney制作商用矢量插画 Midjourney设计徽章 Midjourney图片融合 Midjourney后缀参数 Midjourney模特换装 关键词生成模特照片 中国女性模特的…

虚拟机Ubuntu操作系统常用终端命令(3)(详细解释+详细演示)

本篇概要 本篇讲述了Ubuntu操作系统常用的几个功能,即修改文件权限、修改文件属性、可执行脚本、虚拟机网络、FTP服务器、SSH服务器、VIM等方面的知识。希望能够得到大家的支持。 文章目录 本篇概要1.修改文件权限2.修改文件属主3.可执行脚本3.1要点与细节3.2shell…

河北吉力宝打造步力宝智能康养鞋,助力健康中国行

据国家统计局数据,2022年我国总人口减少85万人,正式迈入人口负增长时代。人口老龄化程度的进一步加深令劳动力愈加不足,加之“421”的家庭结构,仅依靠政府性养老事业已难以支撑。 当老年群体对品质消费越来越看重时,康…

9万多条执业医师资格考试题库ACCESS数据库

《9万多条执业医师资格考试题库ACCESS数据库》搜集了大量执业医师资格考试试题,包括临床执业医师资格考试试题、口腔执业医师资格考试试题、中医执业医师资格考试试题、中西医结合执业医师资格考试试题、公卫执业医师资格考试试题等。 分类情况包含:临床…

MT4深受投资者喜欢,anzo capital昂首资本认为这几个特点必不可少

MT4为什么深受投资者喜欢,anzo capital昂首资本总结这几个特点,一起分享给各位投资者。 一.专业性 MT4平台是专门为外汇投资交易而设计的。投资者可以在指数、股票、债券、商品和各种货币上交易货币和差价合约。 二.兼容性 MT4不仅可用于32位操作系统…

对于每种情况分别统计概率来计算期望+树上连通块统计:ARC165E

https://atcoder.jp/contests/arc165/tasks/arc165_e 考虑一个常见套路,我们对每个连通块统计其概率,设为 p ( T ) p(T) p(T),则答案为 ∑ ∣ T ∣ > k p ( T ) \sum_{|T|>k} p(T) ∣T∣>k∑​p(T) 可以想成对于每个大小大于 k …

RockyLinux安装MariaDB

文章目录 1 前言2 参考3 开始安装3.1 运行官方脚本 添加 MariaDB 的源3.2 安装 MariaDB Server3.3 启动 MariaDB 4 SSH 登录 MariaDB4.1 ssh 上使用 root 账号登录4.2 新建管理员账号并授权 5 放行端口 33065.1 VirtualBox 上设置端口转发5.2 Rocky Linux 防火墙放行 3306 端口…

nacos服务端--切换数据源

nacos服务端版本:2.3.0-SNAPSHOT 在nacos的服务端,需改application.properties文件 #*************** Config Module Related Configurations ***************# ### Deprecated configuration property, it is recommended to use spring.sql.init.platf…

BK698CPA15B0 创建了通用电气数字工业发展指数

BK698CPA15B0 创建了通用电气数字工业发展指数 基于调查研究,通用电气创建了通用电气数字工业发展指数,以跟踪数字化转型的实际进展——从对IIoT的展望到准备好转型。该创始指数在100分制中的总得分为63,表明尽管工业互联网的前景非常强劲&a…

3.2-3.4 Qt样式表使用补充说明

本期内容 3.2 设置样式的几种方式—— 3.2.1 不同方式使用介绍—— 3.2.2 不同方式的优缺点3.3 样式表使用过程中产生的冲突-分析及解决3.4 各控件中常用样式讲解及说明3.2 设置样式的几种方式 我们通常在使用Qt开发的过程中都会使用样式表来美化我们的界面,关于如何使用样式…

input修改checkbox复选框默认选中样式

问题描述&#xff1a; <input type"checkbox" /> input修改checkbox默认选中样式&#xff0c;直接设置选中后的样式不生效&#xff0c;需要先给复选框设置-webkit-appearance: none&#xff08;取消默认样式&#xff09;&#xff0c; 再设置样式才会生效。 …

Vue中的路由懒加载:提高性能和用户体验

Vue中的路由懒加载&#xff1a;提高性能和用户体验 在现代Web应用程序中&#xff0c;性能和用户体验是至关重要的。为了加速页面加载速度和提高用户感知的响应性&#xff0c;Vue提供了一种路由懒加载的方法。本文将详细介绍Vue中如何进行路由懒加载&#xff0c;并提供代码示例…

充分利用学习平台,提升个人职业竞争力

在当今竞争激烈的职场环境中&#xff0c;个人职业竞争力的提升变得至关重要。而充分利用学习平台成为了我们提升竞争力的一种有效途径。学习平台不仅可以提供丰富多样的学习资源&#xff0c;还能提升个人技能和知识水平&#xff0c;让我们更具竞争力。 学习平台提供了丰富的学…

图扑邀您共聚 IOTE 国际物联网展·深圳站 | 展会预告

参展时间&#xff1a;9 月 20 日- 22 日 图扑展位&#xff1a;9 号馆 9B 35-1 参展地址&#xff1a;深圳国际会展中心&#xff08;宝安新馆&#xff09; IOTE 2023 第二十届国际物联网展深圳站&#xff0c;将于 9 月 20 日- 22 日在深圳国际会展中心&#xff08;宝安&#xff0…

Python 之利用matplotlib.pyplot 生成图形和图表

文章目录 介绍运用 介绍 matplotlib.pyplot是Matplotlib库的一个子模块&#xff0c;它提供了一个简单的界面来创建各种类型的图形和图表。使用pyplot&#xff0c;您可以轻松创建、定制和显示图形&#xff0c;而无需编写大量的底层代码。以下是matplotlib.pyplot的一些常见用法…