基于埋点日志数据的网络流量统计 - PV、UV

news2025/1/13 10:19:55

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

一、 网站总流量数统计 - PV

  1. 需求分析

  2. 代码实现

   方式一

   方式二

   方式三:使用process算子实现

   方式四:使用process算子实现

二、网站独立访客数统计 - UV

  1. 需求分析

  2. 代码实现


一、 网站总流量数统计 - PV

  PV全称 Page View,也就是一个网站的页面浏览量。每当用户进入网站加载或者刷新某个页面时,就会给该网站带来PV量,它往往用来衡量一个网站的流量和用户活跃度。当然了,单个指标并不能全面的反映网站的实际情况,往往需要结合其他的指标进行分析。

1. 需求分析

  埋点采集到的数据格式大概是这个样子(文件已上传资源)

在这里插入图片描述第一个是userId、第二个是itemId、第三个是categoryId、第四个是behavior、第五个是timestamp

所以我们要统计PV的话要先从第四列中筛选出PV,然后再进行累加,求出最终的PV

2. 代码实现

方式一:
  先用 readTextFile()读取文件,然后将读取到的每行数据封装成一个bean对象,再通过 filter过滤出我们需要的PV数据,这时得到的都是封装好的一个个对象没法直接sum,所以通过 map将数据映射为一个个的元组类型(PV,1)然后,使用 keyBy()将他们分到同一个并行度中进行 sum,得出最终的结果。
public class Flink01_Project_PV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 将数据封装成 UserBehavior 对象
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String line) throws Exception {
                        String[] data = line.split(",");
                        return new UserBehavior(
                                Long.valueOf(data[0]),
                                Long.valueOf(data[1]),
                                Integer.valueOf(data[2]),
                                data[3],
                                Long.valueOf(data[4]));
                    }
                })
                // 过滤出行为为PV的数据
                .filter(new FilterFunction<UserBehavior>() {
                    @Override
                    public boolean filter(UserBehavior value) throws Exception {
                        return "pv".equals(value.getBehavior());
                    }
                })
                // 因为直接求和的话没法求,所以做一次映射,映射成 (PV,1) 这样的结构
                .map(new MapFunction<UserBehavior, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(UserBehavior value) throws Exception {
                        return Tuple2.of(value.getBehavior(),1L);
                    }
                })
                // 然后 将他们通过key进行分组,进入同一个并行度里面 进行求和
                .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                // 进行求和
                .sum(1)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

方式二:

  这种方式省去了方式一的封装对象,其他的思路都一样。

public class Flink01_Project_PV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 直接过滤出我们想要的数据
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        String[] data = value.split(",");
                        return "pv".equals(data[3]);
                    }
                })
                // 然后将结构转换为元组类型 (PV,1)
                .map(new MapFunction<String, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] data = value.split(",");
                        return Tuple2.of(data[3],1L);
                    }
                })
                // 通过key分组
                .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                // 求和
                .sum(1)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

方式三:使用process算子实现

  首先使用readTextFile读取数据,使用map将读取到的数据封装为对象,然后使用keyBy进行分组,最后使用process算子进行求解

  • 为什么要使用keyBy():目的是让pv数据进入同一个并行度,如果不使用直接process的话,两个并行度里面都有一个sum,结果就不对了
  • 为什么不使用filter过滤呢?因为我们的过滤逻辑是再process里面完成的,所以不用再额外过滤
public class Flink02_Project_PV_process {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 封装成 UserBehavior 对象
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String line) throws Exception {
                        String[] data = line.split(",");
                        return new UserBehavior(
                                Long.valueOf(data[0]),
                                Long.valueOf(data[1]),
                                Integer.valueOf(data[2]),
                                data[3],
                                Long.valueOf(data[4]));
                    }
                })
                // 通过key分组
                .keyBy(new KeySelector<UserBehavior, String>() {
                    @Override
                    public String getKey(UserBehavior value) throws Exception {
                        return value.getBehavior();
                    }
                })
                // 使用proces算子实现 PV 的统计
                .process(new ProcessFunction<UserBehavior, String>() {
                    // 定义累加变量
                    long sum =0L ;
                    @Override
                    public void processElement(UserBehavior value, Context ctx, Collector<String> out) throws Exception {
                        // 判断用户行为是否是PV
                        if ("pv".equals(value.getBehavior())){
                            // 条件满足 sum+1
                            sum++;
                            // 将结果收集
                            out.collect("pv = "+sum);
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

方式四:使用process算子实现

  方式四相比于方式三省去了对象的封装,其他思路一样。

public class Flink02_Project_PV_process {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 通过key分组
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value.split(",")[3];
                    }
                })
                // 直接使用process求 PV
                .process(new ProcessFunction<String, String>() {
                    // 定义累加变量
                    long sum = 0L;
                    @Override
                    public void processElement(String line, Context ctx, Collector<String> out) throws Exception {
                    	// 将过来的每行数据切割
                        String[] datas = line.split(",");
                        // 判断是否是我们想要的数据
                        if("pv".equals(datas[3])){
                        	// 符合条件,将累加变量+1
                            sum++;
                            // 收集结果
                            out.collect("pv = "+sum);
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述


二、网站独立访客数统计 - UV

  UV全称 Unique Visitor,也就是独立访客数。在PV中,我们统计的是所有用户对所有页面的浏览行为,也就是同一个用户的浏览行为会被重复统计。实际上我们关注的是在某一特定范围内(一天、一周或者一个月)内访问该网站的用户数,也就是每个访客只计算一次。它能从侧面反映出该网站的受欢迎程度和用户规模的大小。

1. 需求分析

  要统计UV量的话,只需要对全量的PV,使用userId去重,然后就能得到独立访客数了。

2. 代码实现

  先filter过滤出PV数据,然后通过keyBy将PV分到同一组,然后使用process进行处理,处理方法是:用set集合存放userId,如果下一个userId可以加入该集合说明是一个新的独立访客,则收集当前集合的大小,若加入失败,说明集合中已经存在该userId,也就是不是一个新的独立访客,也就不做处理了。

public class Flink03_Project_UV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 过滤出 PV 数据
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return "pv".equals(value.split(",")[3]);
                    }
                })
                // 将PV的数据分到同一个组里面
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value.split(",")[3];
                    }
                })
                // 对同一组里面的数据进行处理
                .process(new ProcessFunction<String, String>() {
                    // 存放 userId 的容器,回自动对数据进行去重,最后直接拿它的大小就知道UV了
                    Set<Long> userIdSet = new HashSet<>();
                    @Override
                    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
                        Long userId = Long.valueOf(value.split(",")[0]);
                        // 向set中添加userId,判断是否添加成功
                        if (userIdSet.add(userId)) {
                            // 添加成功的话,说明是一个新的独立访客,收集到此时容器大小
                            out.collect("UV = "+ userIdSet.size());
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/830441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新闻稿代写软件有哪些?聪明灵犀工具助你撰写合格新闻稿

新闻稿代写软件有哪些&#xff1f;新闻稿是一种重要的宣传工具&#xff0c;但是撰写优秀的新闻稿需要一定的写作技巧和经验。幸运的是&#xff0c;现在有许多新闻稿代写软件可供使用&#xff0c;这些工具可以帮助你撰写出更优质的新闻稿。本文将介绍一些常用的新闻稿代写软件以…

【性能调优-实例演示】CPU爆了怎么定位问题--》调试指令性能分析工具

性能调优 定位生产性能问题 生产环境&#xff0c;CPU Memory 告警 top&#xff1a;找出占CPU比较高的进程${pid}&#xff08;内存增长&#xff0c;CPU居高不下&#xff09;top -Hp ${pid}&#xff1a;显示所有线程的CPU占比&#xff0c;观察进程中的线程&#xff0c;找出哪个…

Vol的学习

首先学习基础用法 1.查看系统基本信息 vol.py -f 路径 imageinfo 2.查看进程命令行 vol.py -f 路径 --profile系统版本 cmdline vol.py -f 路径 --profile版本 cmdscan 3.查看进程信息 vol.py -f 路径 --profile系统 pslist 通过树的方式返回 vol.py -f 路径 --profile系统…

淘宝资源采集(从零开始学习淘宝数据爬取)

1. 为什么要进行淘宝数据爬取&#xff1f; 淘宝数据爬取是指通过自动化程序从淘宝网站上获取数据的过程。这些数据可以包括商品信息、销售数据、评论等等。淘宝数据爬取可以帮助您了解市场趋势、优化您的产品选择以及提高销售额。 淘宝作为全球的电商平台&#xff0c;每天都有…

从初学者到专家:Java 数据类型和变量的完整指南

目录 一、字面常量 1.1什么常量&#xff1f; 1.2常见的六种常量类型 二、数据类型 2.1什么是数据类型&#xff1f; 2.2基本数据类型&#xff1a; 2.3引用数据类型 三、变量 3.1什么是变量&#xff1f; 3.2变量的命名规则 3.3变量的作用域 3.4变量的被final修饰 四…

C++ 外部变量和外部函数

1.外部变量 如果一个变量除了在定义它的源文件中可以使用外&#xff0c;还能被其他文件使用&#xff0c;那么就称这个变量为外部变量。命名空间作用域中定义的变量&#xff0c;默认情况下都是外部变量&#xff0c;但在其他文件中如果需要使用这一变量&#xff0c;需要用extern…

CAS - 原理简介

CAS是JDK提供的非阻塞原子操作&#xff0c;它通过硬件保证了比较-更新的原子性。它是非阻塞的且自身具有原子性&#xff0c;也就是说CAS效率高、可靠。CAS是一条CPU的原子指令(cmpxchg指令)&#xff0c;不会造成所谓的数据不一致问题&#xff0c;Unsafe类提供的CAS方法&#xf…

【内网穿透】内网穿透应用场景

伴随着科学技术的进步&#xff0c;我们身边出现了越来越多的电子设备&#xff0c;特别是移动电子设备的普及&#xff0c;给我们的生活带来极大的便利&#xff0c;而软件技术的发展&#xff0c;更为这些软件设备带来更多应用的可能。虽然移动设备覆盖了了我们生活的绝大部分场景…

Mybatis引出的一系列问题-JDBC 的探究

1 引入对JDBC的理解-1 一般来说&#xff0c;Java应用程序访问数据库的过程是&#xff1a; 装载数据库驱动程序&#xff1b;通过jdbc建立数据库连接&#xff1b;访问数据库&#xff0c;执行sql语句&#xff1b;断开数据库连接。 Public void FindAllUsers(){//1、装载sqlserve…

面试热题(接雨水问题)

给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 我们看到题的第一步&#xff0c;永远是对入参进行判断 public int trap(int[] height) {if (height null) {return 0;}...} 但是我们想想看&#xff0c;接…

uniapp微信小程序底部弹窗自定义组件

基础弹窗效果组件 <template><view><viewclass"tui-actionsheet-class tui-actionsheet":class"[show ? tui-actionsheet-show : ]"><view class"regional-selection">底部弹窗</view></view><!-- 遮罩…

教你使用Pyinstaller将Python源码打包成可执行程序exe的方法

pyinstaller是一个常用的Python打包工具&#xff0c;可以将Python程序打包成独立的可执行文件&#xff0c;支持Windows、Linux和macOS等平台。 ★★★Pyinstaller有许多参数&#xff0c;以下是其中一些主要参数的含义&#xff1a; -F, --onefile&#xff1a;打包一个单个文件…

Excel如何把两列互换

第一步&#xff1a;选择一列 打开excel&#xff0c;选中一列后将鼠标放在列后&#xff0c;让箭头变成十字方向。 第二步&#xff1a;选择Shift键 按住键盘上的Shift键&#xff0c;将列往后移动变成图示样。 第三步&#xff1a;选择互换 完成上述操作后&#xff0c;松开鼠标两…

Netty框架:ByteBuf空间不够时自动扩充空间

说明 使用Netty的ByteBuf&#xff0c;空间不够时可以自动扩充。扩充时&#xff0c;不是一个字节一个字节的扩充&#xff0c;而是扩充一段空间。对于不同的ByteBufAllocator实现&#xff0c;每次扩充的空间大小也不相同。 代码举例 使用Unpooled分配ByteBuf 下面这段代码&am…

数据结构 | 搜索和排序——搜索

目录 一、顺序搜索 二、分析顺序搜索算法 三、二分搜索 四、分析二分搜索算法 五、散列 5.1 散列函数 5.2 处理冲突 5.3 实现映射抽象数据类型 搜索是指从元素集合中找到某个特定元素的算法过程。搜索过程通常返回True或False&#xff0c;分别表示元素是否存在。有时&a…

快速WordPress个人博客并内网穿透发布到互联网

快速WordPress个人博客并内网穿透发布到互联网 文章目录 快速WordPress个人博客并内网穿透发布到互联网 我们能够通过cpolar完整的搭建起一个属于自己的网站&#xff0c;并且通过cpolar建立的数据隧道&#xff0c;从而让我们存放在本地电脑上的网站&#xff0c;能够为公众互联网…

Qt+GDAL开发笔记(二):在windows系统msvc207x64编译GDAL库、搭建开发环境和基础Demo

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/132077288 红胖子网络科技博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…

阿里云平台注册及基础使用

首先进入阿里云官网&#xff1a; 阿里云-计算&#xff0c;为了无法计算的价值 点击右上角“登录/注册”&#xff0c;如果没有阿里云账号则需要注册。 注册界面&#xff1a; 注册完成后需要开通物联网平台公共实例&#xff1a; 注册成功后的登录&#xff1a; 同样点击右上角的…

改进的智能优化算法定性分析:探索与开发分析(Analysis of the exploration and exploitation)

目录 一、改进的智能优化算法探索与开发分析 二、GWO1与GWO探索与开发分析运行结果 三、GWO1与GWO探索与开发实验对比分析 四、代码获取 一、改进的智能优化算法探索与开发分析 智能优化算法旨在将搜索过程分为两个阶段&#xff1a;探索和开发。平衡这两个阶段对于增强算法…

智能化RFID耳机装配系统:提升效率、精准追踪与优化管理

智能化RFID耳机装配系统&#xff1a;提升效率、精准追踪与优化管理 在当今的智能化时代&#xff0c;无线射频识别技术&#xff08;RFID&#xff09;被广泛应用于各个行业。本文将介绍一种基于RFID技术的智能耳机装配案例&#xff0c;通过RFID技术实现耳机装配过程的自动化控制…