【大数据学习篇11】Spark项目实战~网站转化率统计

news2024/11/18 7:42:32

学习目标/Target

掌握广告点击流实时统计实现思路

掌握利用Kafka生产用户广告点击流数据

了解数据库设计

掌握如何创建Spark Streaming连接

掌握利用Spark Streaming读取业务数据

掌握利用Spark读取黑名单用户

掌握利用Spark Streaming过滤黑名单用户

掌握利用Spark Streaming统计每个城市不同广告的点击次数

掌握利用Spark Streaming添加黑名单用户

掌握将数据持久化到HBase数据库

熟悉如何利用HBase Shell命令向HBase数据库的表中添加数据

概述

        电商网站通常会存在一些广告位,当用户浏览网站时投放的广告内容会在对应广告位显示。此时,有些用户可能会点击广告跳转到对应界面去查看详情,从而提升用户在网站的浏览深度和购买概率,针对这种用户广告点击行为的实时数据进行实时计算和统计,可以帮助公司实时地掌握各种广告的投放效果,以便于后续能够及时地对广告投放相关的策略进行调整和优化,以期望通过广告的投放获取更高的收益。

目录

学习目标/Target

概述

1.数据集分析

2.实现思路分析

3.数据库设计

4. 实现广告点击流实时统计

5.运行程序


1.数据集分析

        本需求采用Java程序模拟生成用户广告点击数据,通过Kafka的生产者发布用户广告点击数据形成实时数据流,数据流中的每一条数据代表一个用户点击广告的行为,当Kafka生产者程序运行时会产出源源不断的用户广告点击流数据。

1596006895171,16,6,tianjin

        单条用户广告点击数据包含四个字段内容,依次分别是时间戳(time)、用户ID(userid)、广告ID(adid)和城市(city)。

2.实现思路分析

        通过Kafka实时生产用户广告点击流数据,SparkStreaming作为消费者实时读取Kafka生产的数据,与HBase数据库中黑名单用户表的数据进行合并,并过滤包含黑名单用户的数据。对过滤后的数据进行两次聚合操作,第一次聚合统计每个广告在不用城市的点击次数。第二次聚合统计用户出现的次数,用于将广告点击次数超过100的用户添加到黑名单用户中。

读取:读取Kafka实时生产用户广告点击流数据。

转换:将数据格式转换为以userid为Key,adid和city作为一个整体为Value的数据形式。

合并/过滤:将转换后的数据与读取的黑名单用户数据进行合并,并过滤包含黑名单用户的数据。

转换/聚合:将数据格式转换为以adid和city作为一个整体为Key,数值1作为Value的数据形式。

转换/聚合:将数据格式转换为以userid作为Key,值1作为Value的数据形式,然后进行聚合操作统计每个用户出现的次数。

读取:读取HBase数据库中黑名单用户

添加:将用户出现次数超过100的用户添加到HBase数据库中的黑名单用户中。

3.数据库设计

        读取HBase数据库中黑名单用户 将转换后的数据与读取的黑名单用户数据进行合并。

 数据表adstream:存储用户广告点击流实时统计结果。

 数据表blacklist:存储黑名单用户。

STEP  01

打开HBase命令行工具:

        打开虚拟机启动大数据集群环境(此时可以不启动使用远程连接工具SecureCRT连接虚拟机Spark01,执行“hbase shell”命令进入HBase的命令行工具。

STEP  02

创建表blacklist:

        通过HBase的命令行工具创建表blacklist并指定列族为black_user,用于存储黑名单用户数据。

 create 'blacklist','black_user'

STEP  03

插入黑名单用户:

        通过HBase的命令行工具在表blacklist的列族black_user下插入黑名单用户,指定uerid为33、44和55的用户为黑名单用户。

STEP  04

创建表adstream:

        通过HBase的命令行工具创建表adstream并指定列族为area_ads_count,用于存储用户广告点击流实时统计结果。

create 'adstream','area_ads_count'

4. 实现广告点击流实时统计

4.1  修改pom.xml文件

在项目SparkProject的pom.xml文件中添加Spark Streaming、Hadoop和Spark Streaming整合Kafka依赖。

<dependency>

    <groupId>org.apache.hadoop</groupId>

    <artifactId>hadoop-common</artifactId>

    <version>2.7.4</version>

</dependency>

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-streaming_2.11</artifactId>

    <version>2.3.2</version>

</dependency>

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>

    <version>2.3.2</version>

</dependency>

 4.2  生产用户广告点击流数据

STEP  01

实现Kafka生产者:

        在项目SparkProject的java目录新建Package包“cn.itcast.streaming”,用于存放广告点击流实时统计的Java文件,并在该包中创建Java类文件MockRealTime,用于实现Kafka生产者,生产用户广告点击流数据。

STEP  02

启动Kafka消费者:

        打开虚拟机启动大数据集群环境(包括Kafka),使用远程连接工具SecureCRT连接虚拟机Spark01,进入Kafka安装目录(/export/servers/kafka_2.11-2.0.0)启动Kafka消费者。

bin/kafka-console-consumer.sh \

--bootstrap-server spark01:9092,spark02:9092,spark03:9092 \

--topic ad

STEP  03

启动Kafka生产者:
 
       在项目SparkProject的包“cn.itcast.streaming”中选中文件MockRealTime.java并单击右键,在弹出的菜单栏选择“Run. MockRealTime.main()”运行Kafka生产者程序,生产用户广告点击流数据。
STEP  04

查看Kafka消费者:

        在虚拟机Spark01的Kafka消费者窗口查看数据是否被成功接收。

STEP  05 

关闭Kafka消费者:

        在虚拟机Spark01的Kafka消费者窗口通过组合键“Ctrl+C”关闭当前消费者。

STEP  06

关闭Kafka生产者:

        在IntelliJ IDEA控制台中单击红色方框的按钮关闭Kafka生产者程序,关闭Kafka生产者程序。

 4.3  创建Spark Streaming连接

        在项目SparkProject的包“cn.itcast.streaming”中创建Java类文件AdsRealTime.java,用于实现广告点击流实时统计。

public class AdsRealTime {

    public static void main(String[] arg) throws IOException,

            InterruptedException {

         //实现Spark Streaming程序

    }

}

        在类AdsRealTime的main()方法中,创建JavaStreamingContext对象和SparkConf对象,JavaStreamingContext对象用于实现Spark Streaming程序,SparkConf对象用于配置Spark Streaming程序各种参数。

System.setProperty("HADOOP_USER_NAME","root");

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("stream_ad"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); jsc.checkpoint("hdfs://192.168.121.133:9000/checkpoint");

4.4  读取用户广告点击流数据

在类AdsRealTime的main()方法中,指定Kafka消费者的相关配置信息。

final Collection<String> topics = Arrays.asList("ad"); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers","spark01:9092,spark02:9092,spark03:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "adstream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", true);

        在类AdsRealTime的main()方法中,使用类KafkaUtils的createDirectStream()方法从Kafka生产者读取用户广告点击流数据,并加载到userAdStream。

JavaInputDStream<ConsumerRecord<String, String>> userAdStream = KafkaUtils.createDirectStream(

         jsc,LocationStrategies.PreferConsistent(),

         ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)

    );

4.5  获取业务数据

        在类AdsRealTime的main()方法中,使用mapToPair()算子转换userAdStream中每一行数据生,获取用户广告点击流数据中的userid(用户ID)、adid(广告ID)和city(城市),将转化结果加载到userClickAdsStream。

JavaPairDStream<String,Tuple2<String,String>> userClickAdsStream =         userAdStream.mapToPair((PairFunction<ConsumerRecord<String, String>,                         String,Tuple2<String, String>>) record -> {

    String[] value = record.value().split(",");

    String userid =value[1];

    String adid =value[2];

    String city =value[3];

    return new Tuple2<>(userid,new Tuple2<>(adid,city));

});

4.6  读取黑名单用户数据

        在类AdsRealTime的main()方法中,使用mapToPair()算子转换userAdStream中每一行数据生,获取用户广告点击流数据中的userid(用户ID)、adid(广告ID)和city(城市),将转化结果加载到userClickAdsStream。

JavaPairDStream<String,Tuple2<String,String>> userClickAdsStream =         userAdStream.mapToPair((PairFunction<ConsumerRecord<String, String>,                         String,Tuple2<String, String>>) record -> {

    String[] value = record.value().split(",");

    String userid =value[1];

    String adid =value[2];

    String city =value[3];

    return new Tuple2<>(userid,new Tuple2<>(adid,city));

});

        在HBase数据库操作工具类HbaseUtils中添加方法scan(),用于获取HBase数据库中指定表的全部数据。

public static ResultScanner scan(String tableName)

        throws IOException {

    Table table = HbaseConnect.getConnection().getTable(TableName.valueOf(tableName));

    Scan scan = new Scan();

    return table.getScanner(scan);

}

        在类AdsRealTime中添加方法getBlackUser(),用于获取HBase数据库中黑名单用户表的数据。

public static ArrayList getBlackUser() throws IOException {

    ResultScanner blcakResult = HbaseUtils.scan("blacklist");

    Iterator<Result> blackIterator = blcakResult.iterator();

    ArrayList<Tuple2<String,String>> blackList = new ArrayList<>();

    while (blackIterator.hasNext()){

        String blackUserId = new String(blackIterator.next().value());

        blackList.add(new Tuple2<>(blackUserId,"black"));

    }

    return blackList;

}

        在类AdsRealTime的main()方法中,使用parallelizePairs()算子将存放黑名单用户的集合转换JavaPairRDD,将转换结果加载到blackUserRDD。

JavaPairRDD<String,String> blackUserRDD =                                 jsc.sparkContext().parallelizePairs(getBlackUser());

4.7  过滤黑名单用户

        在类AdsRealTime的main()方法中,使用transformToPair()算子转换userClickAdsStream的每一行数据,在转换的过程中过滤黑名单用户,将转换结果加载到checkUserClickAdsStream。

JavaPairDStream<String,Tuple2<String,String>> checkUserClickAdsStream = userClickAdsStream.transformToPair( ……

);

        在类AdsRealTime的main()方法中,使用mapToPair()算子转换checkUserClickAdsStream的每一行数据,将用户ID的值替换为1,通过areaAdsStream加载转换结果。

JavaPairDStream<Tuple2<String,String>, Integer> areaAdsStream    

= checkUserClickAdsStream.mapToPair(

            (PairFunction<

                    Tuple2<String, Tuple2<String, String>>,

                    Tuple2<String, String>,

                    Integer>) checkUserClickAdsTuple2 -> {

        String adid = checkUserClickAdsTuple2._2._1;

        String city = checkUserClickAdsTuple2._2._2;

        return new Tuple2<>(new Tuple2<>(city,adid),new Integer(1));

    });

        在类AdsRealTime的main()方法中,使用updateStateByKey()算子维护areaAdsStream的状态,用于统计每个城市不同广告的点击次数,将统计结果加载到countAreaAdsStream。

JavaPairDStream<Tuple2<String,String>, Integer> countAreaAdsStream

        = areaAdsStream.updateStateByKey(                 (Function2<List<Integer>,Optional<Integer>, Optional<Integer>>)

                        (valueList, oldState) -> {

    Integer newState = 0;

    if (oldState.isPresent()){

        newState = oldState.get();

    }

    for (Integer value : valueList){

        newState += value;

    }

    return Optional.of(newState);

});

        在类AdsRealTime的main()方法中,使用mapToPair()算子转换checkUserClickAdsStream的每一行数据,便于后续聚合统计每个用户点击广告的次数,将转换结果加载到userStream。

JavaPairDStream<String,Integer> userStream = checkUserClickAdsStream

    .mapToPair(

        (PairFunction<Tuple2<String, Tuple2<String, String>>,

            String, Integer>) checkUserClickAdsTuple2 ->

                new Tuple2<>(

                    checkUserClickAdsTuple2._1,

                    new Integer(1)));

        在类AdsRealTime的main()方法中,使用updateStateByKey()算子维护userStream的状态,用于统计每个用户点击广告的次数,将统计结果加载到countUserStream。

JavaPairDStream<String, Integer> countUserStream           =userStream.updateStateByKey((Function2<List<Integer>,Optional<Integer>,Optional<Integer>>)

            (valueList, oldState) -> {

    Integer newState = 0;

    if (oldState.isPresent()){

        newState = oldState.get();

    }

    for (Integer value : valueList){

        newState += value;

    }

    return Optional.of(newState);

});

4.9  添加黑名单用户

        在类AdsRealTime的main()方法中,使用foreachRDD()算子遍历countUserStream中的RDD,将广告点击次数超过100的用户添加到HBase数据库的黑名单表blacklist中。

countUserStream.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>)         countUserRDD -> countUserRDD.foreach((VoidFunction<Tuple2<String, Integer>>)                         countUserTuple2 -> {

    if (countUserTuple2._2>100){

        HbaseUtils.putsOneToHBase(

                "blacklist",

                "user"+countUserTuple2._1,

                "black_user",

                "userid",

                countUserTuple2._1);

    }

 }));

4.10  持久化数据

        在类AdsRealTime的main()方法中,使用foreachRDD()算子遍历countAreaAdsStream,将每个城市中不同广告的点击次数持久化到HBase数据库的adstream表。

countAreaAdsStream.foreachRDD((

        VoidFunction<JavaPairRDD<Tuple2<String, String>,Integer>>

        ) countAreaAdsRDD ->countAreaAdsRDD.foreach((VoidFunction<              Tuple2<Tuple2<String, String>, Integer>>)

                 countAreaAdsTuple2 -> {

    String adid = countAreaAdsTuple2._1._2;

    String city = countAreaAdsTuple2._1._1;

    int count = countAreaAdsTuple2._2;     HbaseUtils.putsOneToHBase("adstream",city+"_"+adid,"area_ads_count","area",city);     HbaseUtils.putsOneToHBase("adstream",city+"_"+adid,"area_ads_count","ad",adid);     HbaseUtils.putsOneToHBase("adstream",city+"_"+adid,"area_ads_count","count",String.valueOf(count)); }));

        在类AdsRealTime的main()方法中,添加启动与关闭Spark Streaming连接等方法。

jsc.start();

jsc.awaitTermination();

HbaseConnect.closeConnection();

jsc.close();

5.运行程序

         Kafka生产者程序和用户广告点击流实时统计程序启动成功后,可在IDEA的控制台查看程序运行状态。

         使用远程连接工具SecureCRT连接虚拟机Spark01,执行“hbase shell”命令,进入HBase命令行工具,在HBase命令行工具中执行“scan 'adstream'”命令,查看HBase数据库中表adstream的统计结果。

小结 

        本文主要讲解了如何通过用户广告点击流数据实现广告点击流实时统计,首先我们对数据集进行分析,使读者了解广告点击流的数据结构。接着通过实现思路分析,使读者了解广告点击流实时统计的实现流程。然后通过IntelliJ IDEA开发工具实现广告点击流实时统计程序并将统计结果实时存储到HBase数据库,使读者掌握运用Java语言编写Spark Streaming、HBase和Kafka生产者程序的能力。最后在IntelliJ IDEA开发工具运行用户广告点击流实时统计程序,使读者了解IntelliJ IDEA开发工具运行程序的方法。

点赞一建三连!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/592333.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用ArcGIS与CAD制作设计底图

准备工作&#xff1a; 1、需要用到的软件&#xff1a;ArcGIS软件、AutoCAD&#xff1b; 2、卫星图数据、矢量数据&#xff08;因数据涉密&#xff0c;需要的同事请联系科技小组拷贝&#xff0c;并签署保密协议&#xff0c;严格履行保密责任&#xff09;。 现在&#xff0c;保…

小程序开发逆势爆发,如此会无疾而终?

2019年&#xff0c;小程序迎来了爆发式的增长&#xff0c;一年时间&#xff0c;微信小程序的活跃用户达到了3.2亿&#xff0c;日活跃用户高达5.4亿&#xff0c;在这巨大的数据背后&#xff0c;是无数商家和企业的努力与付出。小程序开发的优势显而易见&#xff0c;不少商家和企…

for in和for of的区别

for in for in 使用于可枚举的数据 如 对象 数组 字符串 什么是可枚举的&#xff1a;属性的enumerable值为true&#xff0c;表示可枚举 可以通过es7新增的属性 Object.getOwnPropertyDescriptors()验证 查看 Object.getOwnPropertyDescriptor() 方法用于 返回 指定 对象 上一个…

kubernetes安装dashboard教程

kubernetes安装dashboard教程 前提&#xff1a; kubernetes集群安装完毕 安装&#xff1a; 1.到github获取配置文件 github下面给出方法说使用下面的直接执行就可以了&#xff0c;但是最近不知道为何找不到地址。 kubectl apply -f https://raw.githubusercontent.com/ku…

查看网页cookie的方法

方法一 进入目标网页后&#xff0c; 按F12&#xff0c;找到Console&#xff0c;在filter框内输入&#xff1a;document.cookie&#xff0c;然后回车 如果filter框内输入后下面没有显示&#xff0c;需要在2的位置输入document.cookie回车 其中红色的内容即为cookie内容 不过这…

如何使用OpenAI GPT-3进行自然语言生成?

自然语言生成是一项非常引人注目的技术&#xff0c;可以让计算机像人类一样理解、生成自然语言文本。最近&#xff0c;OpenAI发布了一种名为GPT-3的巨型语言模型&#xff0c;它是史上最强大的自然语言生成模型之一。在本文中&#xff0c;我将介绍如何使用GPT-3进行自然语言生成…

vue-admin-template后台管理模板在windows/linux/maxos使用

能用克隆与编译运行命令: # 克隆项目 git clone https://github.com/PanJiaChen/vue-admin-template.git# 进入项目目录 cd vue-admin-template# 安装依赖 npm install# 开发者模式运行 npm run dev 1. macos: 降级为NODEJS 16运行工程: export NODE_OPTIONS--openssl-legac…

【服务器】本地搭建PHP简单Imagewheel私人云图床

文章目录 1.前言2. Imagewheel网站搭建2.1. Imagewheel下载和安装2.2. Imagewheel网页测试2.3.cpolar的安装和注册 3.本地网页发布3.1.Cpolar临时数据隧道3.2.Cpolar稳定隧道&#xff08;云端设置&#xff09;3.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 4.公网访问测…

AndroidStudio Logcat中文乱码

1&#xff1a;Help-Edit Custom VM Options...&#xff0c;添加&#xff1a; -Dfile.encodingUTF-8 2&#xff1a;File-Settings....-Edittor-File Encodings,Global Encoding、Project Encoding设置为UTF-8 3&#xff1a;记得一定要重启AndroidStudio才会生效。

【Servlet编程】使用Smart Tomcat插件运行Servlet程序

前言: 大家好,我是良辰丫,在上一篇文章中我们已经学习了部署我们的第一个Servlet程序,想必大家对各个步骤已经有了一定的了解和认识,那么能不能优化一下各个步骤呢?每次打包部署有点麻烦哦!那么今天我们就来学习一个idea的插件,可以帮助我们简化我们的部署操作!!!&#x1f49e…

如何在Microsoft Excel中使用COUNTIF函数

COUNTIF 是一个 Excel 函数,用于对满足单个条件的区域中的单元格进行计数。COUNTIF可用于计算包含日期、数字和文本的单元格。COUNTIF 中使用的条件支持逻辑运算符(>、<、<>、=)和通配符(*、?)进行部分匹配。 例如,我们想计算包含 Google或 Facebook 的单元…

Android的消息机制

Android的消息机制 Android的消息机制概述 Android的消息机制主要指的是Handler的运行机制以及Handler所附带的MessageQueue和Looper的工作机制 Handler的主要作用是将一个任务切换到某个指定的线程中执行。 它的主要用处就是当要更新UI界面的时候,我们不能在非UI线程进行更…

React学习笔记九-高阶函数与函数柯里化

此文章是本人在学习React的时候&#xff0c;写下的学习笔记&#xff0c;在此纪录和分享。此为第九篇&#xff0c;主要介绍高阶函数与函数柯里化。 高阶函数&#xff0c;和函数的柯里化&#xff0c;是学习react的拓展&#xff0c;方便以后优化代码&#xff0c;更好的学习react。…

c语言编程练习题:7-115 小于m的最大的10个素数

#include <stdio.h> int is_prime(int a){for (int i2;i<a;i){if (a%i0){return 0;}}return 1; }int main(){int n;int count10;if (scanf("%d",&n)!EOF && n>50 && n<20000){// 计算150&#xff0c;分配给5&#xff0c;2&#x…

UOS桌面系统开机进入Busybox

UOS桌面系统开机进入Busybox 一、问题现象二、解决方案1、livecd工具修复a、制作livecd工具盘b、从优盘启动c、磁盘修复 2、使用fsck修复a、找出有问题的分区b、修复分区c、重启电脑 一、问题现象 开机进入如下图所示界面 问题原因&#xff1a;roota分区损坏 二、解决方案 …

MySQL — InnoDB引擎、MySQL架构、事务原理、MVCC

文章目录 InnoDB引擎一、逻辑存储架构二、架构2.1 内存结构2.1.1 Buffer Pool 缓冲池2.1.2 Change Buffer 更改缓冲区2.1.3 Log Buffer 日志缓冲区域2.1.4 Adaptive Hash Index 自适应hash索引 2.2 磁盘结构2.2.1 System Tablespace 系统表空间2.2.2 File-Per-Table Tablespace…

搭建一个vuepress静态网站及配置

搭建一个vuepress静态网站及配置 一、搭建一个vuepress网站1、创建并进入一个新目录2、初始化3、安装依赖4、创建文档5、配置启动命令及启动6、展示效果 二、配置及丰富vuepress网站1、增加配置文件2、配置侧边栏目录3、使用部分markdown语法完善页面 一、搭建一个vuepress网站…

【Python实战】Python采集热榜数据

前言 大家好,我们今天来爬取热搜榜,把其文章名称,链接和作者获取下来,我们保存到本地,我们通过测试,发现其实很简单,我们只要简单获取数据就可以。没有加密的东西。 效果如下: 环境使用 python 3.9pycharm模块使用 requests模块介绍 requests requests是一个很…

​​​​Linux Shell 实现一键部署Ruby3

ruby Ruby&#xff0c;一种简单快捷的面向对象&#xff08;面向对象程序设计&#xff09;脚本语言&#xff0c;在20世纪90年代由日本人松本行弘(Yukihiro Matsumoto)开发&#xff0c;遵守GPL协议和Ruby License。它的灵感与特性来自于 Perl、Smalltalk、Eiffel、Ada以及 Lisp …

【上篇】我们邀请了4位专家来探讨消费市场的新增量:W型机会、单客经济、日本市场、DTC......

好久不见了&#xff0c;我是增长黑盒的创始人yolo。最近我们总是发布一些严肃型的行业报告&#xff0c;相信大家的动作都是在第一时间点个收藏&#xff0c;然后....就没有然后了。 所以&#xff0c;今天我们的内容没有复杂的图表和数据&#xff0c;想用比较轻松的对话形式来呈现…