大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化

news2025/1/15 17:24:03

文章目录

  • 0 前言
  • 1、环境准备
    • 1.1 flink 下载相关 jar 包
    • 1.2 生成 kafka 数据
    • 1.3 开发前的三个小 tip
  • 2、flink-sql 客户端编写运行 sql
    • 2.1 创建 kafka 数据源表
    • 2.2 指标统计:每小时成交量
      • 2.2.1 创建 es 结果表, 存放每小时的成交量
      • 2.2.2 执行 sql ,统计每小时的成交量
    • 2.3 指标统计:每10分钟累计独立用户数
      • 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
      • 2.3.2 创建视图
      • 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
    • 2.4 指标统计:商品类目销量排行
      • 2.4.1 创建商品类目维表
      • 2.4.1 创建 es 结果表,存放商品类目排行表
      • 2.4.2 创建视图
      • 2.4.3 执行 sql , 统计商品类目销量排行
  • 3、最终效果与体验心得
    • 3.1 最终效果
    • 3.2 体验心得
      • 3.2.1 执行
      • 3.2.2 存储
  • 4 最后


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH (
    'connector.type' = 'kafka', 
    'connector.version' = 'universal',  
    'connector.topic' = 'user_behavior',  
    'connector.startup-mode' = 'earliest-offset', 
    'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 
    'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 
    'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'buy_cnt_per_hour',
    'connector.document-type' = 'user_behavior',
    'connector.bulk-flush.max-actions' = '1',
    'update-mode' = 'append',
    'format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',    
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (
    sub_category_id BIGINT,
    parent_category_name STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (
    category_name  STRING,
    buy_cnt  BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1358501.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

提升网络安全重要要素IP地址

在数字化时代,网络安全已经成为人们关注的焦点。本文将深入探讨网络安全与IP地址之间的紧密联系,以及IP地址在构建数字世界的前沿堡垒中的关键作用。 网络安全是当今数字社会中不可忽视的挑战之一。而IP地址,作为互联网通信的基础协议&#…

震惊!原来这就是JavaScript闭包的秘密

📢 鸿蒙专栏:想学鸿蒙的,冲 📢 C语言专栏:想学C语言的,冲 📢 VUE专栏:想学VUE的,冲这里 📢 CSS专栏:想学CSS的,冲这里 &#x1f4…

第一届能源电子产业创新大赛太阳能光伏赛道决赛及颁奖仪式在宜宾成功举办

在工业和信息化部电子信息司指导下,由工业和信息化部产业发展促进中心和宜宾市人民政府主办,宜宾市经济和信息化局、宜宾高新技术产业园区管理委员会承办的第一届能源电子产业创新大赛太阳能光伏赛道决赛及颁奖仪式于2024年1月3日-5日在宜宾市成功举办。…

DDIA 第十一章:流处理

本文是《数据密集型应用系统设计》(DDIA)的读书笔记,一共十二章,我已经全部阅读并且整理完毕。 采用一问一答的形式,并且用列表形式整理了原文。 笔记的内容大概是原文的 1/5 ~ 1/3,所以你如果没有很多时间…

2023年后,AI 还有什么研究方向有前景?

什么是AI ​ AI代表人工智能,它是指通过计算机科学技术使机器能够执行需要智力的任务的一种技术。这些任务包括学习、推理、问题解决和感知等,通常是人类智能的表现。人工智能的目标是使计算机系统能够执行需要人类智力的任务,而不需要人类的…

C语言实用第三方库Melon开箱即用之多线程模型

在之前的文章中(开发利器——C 语言必备实用第三方库),笔者介绍了一款Linux/UNIX下C语言库Melon的基本功能,并给出了一个简单的多进程开箱即用的例子。 本文将给大家介绍Melon中多线程的使用方法。 在Melon中有三种多线程模式&a…

点对点SDWAN组网:通过专线连接实现企业分支互联

点对点SDWAN是一种通过软件定义网络技术将企业分支互联的组网解决方案。在点对点SDWAN中,企业分支通过专线连接实现互联,以满足对网络性能和可靠性的要求。 传统的WAN架构通常使用MPLS(多协议标签交换)技术来实现企业分支的互联。…

TransmittableThreadLocal使用踩坑

背景:为了获取相关字段方便,项目里使用了TransmittableThreadLocal上下文,在异步逻辑中get值时发现并非当前请求的值,且是偶发状况(并发问题)。 发现:TransmittableThreadLocal是阿里开源的可以实现父子线程值传递的工…

鸿蒙系列--动态共享包的依赖与使用

一、前言 HarmonyOS的共享包相当于Android的Library,在HarmonyOS中,给开发者提供了两种共享包,HAR(Harmony Archive)静态共享包,和HSP(Harmony Shared Package)动态共享包 区别&…

记一次 .NET 某新能源材料检测系统 崩溃分析

一:背景 1. 讲故事 上周有位朋友找到我,说他的程序经常会偶发性崩溃,一直没找到原因,自己也抓了dump 也没分析出个所以然,让我帮忙看下怎么回事,那既然有 dump,那就开始分析呗。 二&#xff…

计算机创新协会冬令营——暴力枚举题目05

这道题挺基础但是挺多坑的。(•́へ•́╬) 题目 204. 计数质数 - 力扣(LeetCode) 给定整数 n ,返回 所有小于非负整数 n 的质数的数量 。 示例 示例 1: 输入:n 10 输出:4 解释:小于 10 的质…

具有大电流,双通道 12V,短地短电源保护等功能的国产芯片GC8549 可替代ONSEMI的LV8548/LV8549

GC8549 可以工作在 3.8~12V 的电源电压上,每 通道能提供高达 1.5A 持续输出电流或者 2.5A 峰值 电流,睡眠模式下功耗小于 1uA。具有 PWM(IN/EN)输入接口,与行业标 准器件兼容,并具有过温保护,欠压保护&…

信息系统项目管理师好考吗?知识点分析与讲解,码住!

科目一:综合知识考试 科目一考试是由选择题组成的,共有75道题目。考试时间为早上9点到11点半,可以提前交卷,通常11点左右就能离开考场。对于会做的题目,要及时解答,对于不会做的题目,花费过多时…

QC/PD快充电源产品MOS选型分析

• 原边650-700V SJ MOSFET采用低FOM值的ESM 技术,有利于提高系统效 率, 以及更佳的EAS和EMI等特性,对于一些不含PFC电路的系统更友好。 • 副边采用低FOM值的SGT同步整流电路,相比肖特基二极管整流能有更低的 损耗,有…

pinia 给 state 指定变量类型

pinia 给 state 指定变量类型 问题描述 自从用 vitetsvue3 以来,我一直有一个很大的疑问,就是 pinia 中的 state 变量类型该从哪定义,如何定义它? 因为我在使用未定义类型的 state 变量的时候一直会有一个提示,提示说…

JAVA集合框架总结

集合框架概述 1.1 生活中的容器 1.2 数组的特点与弊端 一方面,面向对象语言对事物的体现都是以对象的形式,为了方便对多个对象的操作,就要对对象进行存储。另一方面,使用数组存储对象方面具有一些弊端,而Java 集合就…

Unity之预制体与变体

PS:不用说了,我在写博客就是在摸鱼 一、预制体 不知道大家小时候有没有看过火影,记得剧情最开始的时候水木哄骗鸣人去偷封印之书,反而让鸣人学会了多重影分身之术: 好了,小编绞尽脑子终于想好怎么向大家介绍预制体了&a…

element中Tree 树形控件实现多选、展开折叠、全选全不选、父子联动、默认展开、默认选中、默认禁用、自定义节点内容、可拖拽节点、手风琴模式

目录 1.代码实现2. 效果图3. 使用到的部分属性说明4. 更多属性配置查看element官网 1.代码实现 <template><div class"TreePage"><el-checkboxv-model"menuExpand"change"handleCheckedTreeExpand($event, menu)">展开/折叠&l…

亚马逊店铺遇到账号申诉模版分享

1.表达诚意&#xff0c;先认错再说&#xff1a;我知道&#xff0c;最近我们在Amazon.com上作为卖家的表现已经低于亚马逊和我们自己的质量标准。 2.清楚分明的格式&#xff1a;我们库存管理的混乱导致了延迟发货&#xff0c;更糟糕的是&#xff0c;物品无法使用。当延迟发货和…

00 项目结构

文章目录 后端 后端 后端 - sky-common包 公共类&#xff0c;工具类&#xff0c;常量类- constant 常量类- context 上下文有关的- enumenation 枚举- exception 自定义异常类- json json处理类- properties boot相关的配置属性类- result 结果类- uti…