Spark 学习案例

news2025/1/12 21:54:52

案例1:搜索引擎日志分析

数据来源:使用搜狗实验室提供的【用户查询日志】数据。使用Saprk框架,将数据封装到RDD中进行数据处理分析。
数据网址:数据地址
这个地址可能过期了,需要的伙伴可以私聊博主。

数据格式:

搜索时间		用户ID				搜索内容		URL返回排名	用户点击顺序				用户点击的URL
23:00:03	43080219994871455	c语言			1			1					http://www.xxx.cn
23:00:03	09826806962227619	IDEA			20			22					http://www.xxx.cn

业务需求:

  • 1、搜索关键词统计(中文分词)
    • 我们通过Python中的jieba框架实现中文分词操作
  • 2、用户搜索点击统计
  • 3、搜索时间端统计

jieba中文分词框架

入门使用

通过pip进行安装
因为前面我们都是使用 Anaconda环境,所以在进入Anaconda安装jieba

conda activate pyspark  # 进入虚拟环境
pip install jieba -i https://pypi.tuna.tsinghua.edu.cn/simple  # 安装jieba
# -i https://pypi.tuna.tsinghua.edu.cn/simple 指定国内镜像源

因为我使用的远程解释器环境,需要点击以下远程解释器,更新解释器环境内置的库。

代码示例:
在这里插入图片描述

搜索引擎日志分期案例

代码:
主函数

# coding:utf8
import operator

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from defs import context_jieba, filter_words, append_words, extract_user_and_word
from operator import add

if __name__ == '__main__':
    # 0. 通过SparkConf对象构建SparkContext
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # 1. 读取数据文件
    file_rdd = sc.textFile("../../data/SogouQ.txt")  # '23:00:02\t8181820631750396\titheima\t5\t2\thttp://www.itcast.cn'

    # 2. 对数据进行切分
    split_rdd = file_rdd.map(
        lambda x: x.split("\t"))  # ['08:00:22', '44801909258572364', 'hadoop', '1', '2', 'http://www.itcast.cn']

    # 3. 因为要做多个需求,split_rdd 作为基础rdd 会被多次使用,所以进行持久化操作
    split_rdd.persist(StorageLevel.DISK_ONLY)

    # TODO:需求1:用户搜索的关键词分析(wordCount)
    # 取出搜索内容
    context_rdd = split_rdd.map(lambda x: x[2])
    # 对搜索内容进行分词
    words_rdd = context_rdd.flatMap(context_jieba)
    # print(words_rdd)
    # 分词结果和实际有出入,所以还需要手动处理结果
    # 院校 帮 -> 院校帮
    # 博学 谷 -> 博学谷
    # 传智播 客 -> 传智播客
    # 将单个词 帮,谷,客过滤,最后直接替换为词条
    filtered_rdd = words_rdd.filter(filter_words)
    final_words_rdd = filtered_rdd.map(append_words)
    # 对单词进行排序,求出前五
    result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False,
                                                                     numPartitions=1).take(5)

    print("需求1结果:", result1)

    # TODO 需求2:用户和关键词组合分析
    # 用户id,我喜欢hadoop
    # 用户id+喜欢  用户id+hadoop   这样可以知道用户搜索频率
    user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))
    # 对用户搜索内容进行分词
    user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)
    # 对内容进行分组聚合,排序,求前五
    result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b). \
        sortBy(lambda x: x[1], ascending=False, numPartitions=1). \
        take(5)
    print("需求2结果:", result2)

    # TODO 需求3:热门搜索时间段分析
    # 取出所有的时间
    time_rdd = split_rdd.map(lambda x: x[0])
    # 对时间进行处理,只保留小时精度即可
    hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))
    # lambda a, b: a + b 这个一直写很烦,python的operator包中提供了 加减乘除都有
    result3 = hour_with_one_rdd.reduceByKey(operator.add).\
        sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
        collect()
    print("需求3结果:", result3)

defs.py文件

# coding:utf8

import jieba


def context_jieba(data):
    """jieba分词工具"""
    seg = jieba.cut_for_search(data)
    l = list()
    for word in seg:
        l.append(word)

    return l


def filter_words(data):
    """过滤不要的词"""
    return data not in ["谷", "帮", "客"]


def append_words(data):
    """修订关键词内容"""
    if data == '传智播': data = '传智播客'
    if data == '博学': data = '博学谷'
    if data == '院校': data = '院校帮'
    return (data, 1)


def extract_user_and_word(data):
    """传入数据格式:(用户id,我喜欢hadoop)"""
    user_id = data[0]
    content = data[1]
    # 对content进行分词
    words = context_jieba(content)
    return_list = list()
    for word in words:
        # 过滤 ["谷", "帮", "客"]
        if filter_words(word):
            return_list.append((user_id + "_" + append_words(word)[0] , 1))

    return return_list


提交代码到YARN集群运行

1、修改主文件

# conf = SparkConf().setAppName("test").setMaster("local[*]")
conf = SparkConf().setAppName("test")
# file_rdd = sc.textFile("../../data/SogouQ.txt")
file_rdd = sc.textFile("hdfs://node1:8020/input/SoguoQ.txt")

2、将两个文件上传到Linux中
3、因为代码中使用jieba库,所以所有的机器都应该安装jieba库
因为Executor运行在其他机器上时,没有jieba库会运行报错
4、执行代码

spark-submit --master yarn --py-files /root/defs.py /root/09_Demo_search.py

提交成功
在这里插入图片描述

普通提交成功
榨干集群性能提交
首先我们需要先查看集群的资源有多少

  • 查看CPU有几核 cat /proc/cpuinfo | grep processor | wc -l
  • 查看内存多大 free -g
    在这里插入图片描述
    我学习使用,只分配了1G,但是显示是0.9… ,所以是显示0,将total mem 单元格加1,就是当前机器内存
spark-submit --master yarn --py-files /root/defs.py 
--executor-memory 512m  # 指定每个Executor的内存
--executor-cores 1 #  指定每个Executor的可使用cpu核心数
--num-executors 3 # 指定executor总数 ,一般几台机器就指定几个
/root/09_Demo_search.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/188101.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

css清除浮动的方法

浮动的盒子会脱离标准流,不占有自己原先的位置,导致下面的其他标签往上移动 此时,可以给浮动的盒子添加一个父盒子(块级),并设置高度,就可以避免下面的其他标签上浮 但是有时候父盒子不能设置高…

pycharm:新建虚拟环境和安装依赖

前言 小编深有体会,在刚开始用pycharm跑python的项目的时候,一时间不知道如何下手,特别是作为一个新手小白,这里总结了一份新手避坑指南,主要是新建虚拟环境(生成一个项目对应的解释器)以及安装…

温度预测 python | 使用 Python 可以使用机器学习模型进行温度预测

使用 Python 可以使用机器学习模型进行温度预测。常用的模型有回归分析、随机森林等。使用前需要准备足够的历史数据并进行特征工程,构建模型并进行训练,最后使用预测结果。 文章目录温度预测 回归分析导入必要的库:读取温度数据:…

Java中的插入排序和希尔排序

插入排序&&希尔排序插入排序希尔排序上一篇博客我给大家伙说了一下子堆排序,之所以我把插入排序和希尔排序放在一起呢,是因为希尔排序实际上用到了插入排序的思想,希望下面的内容能够帮助到大家.对于插入排序呢,我们可以参考抓牌顺牌,就在一般情况下,我们也不考虑什么组…

对象在Eden区分配

一、对象在Eden区分配大多数情况下,对象在新生代中 Eden 区分配。当 Eden 区没有足够空间进行分配时,虚拟机将发起一次Minor GC。在测试之前我们先来看看 Minor GC和Full GC 有什么不同呢?Minor GC/Young GC:指发生新生代的的垃圾…

oracle11g SAP测试机归档日志暴增排查(二)

接上面一的内容,通过logminer可以知道是因为oracle11g设置awr快照引起的插入数据,所以要看这个插入是否正常。 之前也发现SYSAUX表空间也没有多少了,应该这个原因引起产生大量的日志 6、查找SYSAUX表空间满的原因 对于SYSAUX表空间而言&…

【博学谷学习记录】大数据课程-学习第五周总结

Hadoop概述 Hadoop介绍 Hadoop是Apache旗下的一个用java语言实现开源软件框架,是一个开发和运行处理大规模数据的软件平台。允许使用简单的编程模型在大量计算机集群上对大型数据集进行分布式处理。 狭义上说,Hadoop指Apache这款开源框架,它…

Gateway, Zuul, Oauth2.0, 前后端分离, 定制页面,登录回调接口的处理

由于公司与Alexa平台接入了语音控制的功能,需要将公司的账号与Alexa的账号进行绑定,所以需要账号授权的操作,也就是使用授权码模式。开发过程中遇到了很多坑,网上关于前后端分离的定制页面的介绍又很少,前前后后花了一…

泰拉瑞亚灾厄NPC不复活x哥布林军队入侵中断

文章首发及后续更新:https://mwhls.top/4415.html,无图/无目录/格式错误/更多相关请至首发页查看。 新的更新内容请到mwhls.top查看。 欢迎提出任何疑问及批评,非常感谢! 目录 NPC 不复活 原因 解决办法 哥布林军队入侵中断 说…

2019-ICML-Towards Graph Pooling by Edge Contraction

2019-ICML-Towards Graph Pooling by Edge Contraction Paper: https://graphreason.github.io/papers/17.pdf Code: https://github.com/Jiajia43/pytorch_geometric 通过边收缩实现图池化 池化层可以使GNN对抽象的节点组而不是单个节点进行推理,从而增加其泛化潜…

MongoDB下载安装

MongoDB 是一个基于分布式文件存储的数据库。由 C 语言编写,旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。 MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。 (摘…

音频编辑服务UI SDK接入指导及常见问题

华为 HMS Core 音频编辑服务(Audio Editor Kit)是华为帮助全球开发者快速构建各类应用音频能力的服务,汇聚了华为在音乐、语音等相关音频领域的先进技术。音频编辑服务为开发者们提供音频基础编辑、AI配音、音源分离、空间渲染、变声、多种音…

IT自动化运维体系的搭建

大家好,我是技福的小咖老师。 对于构建IT运维管理系统而言,如何使用系统的方法来改善运维服务,以及对运维过程进行全面审查尤为重要。今天我们就来讲讲IT自动化运维体系的搭建。 设立IT运维优先原则 优先处理原则是指定义出IT运维的每个关键…

JAVA工具-JDK、JRE、JVM、JIT

目录 概要 JDK和JRE区别 JAVA工具间的联系 JAVA源代码如何被操作系统执行 补充:JIT 概要 JDK:Java Developers Kit-Java开发工具 JRE:Java Runtime Environment-Java运行环境 JVM:Java Virtual Machine-Java虚拟机 JIT:J…

阿里的又一款数据高效同步工具DataX,真香!

我们公司有个项目的数据量高达五千万,但是因为报表那块数据不太准确,业务库和报表库又是跨库操作,所以并不能使用 SQL 来进行同步。当时的打算是通过 mysqldump 或者存储的方式来进行同步,但是尝试后发现这些方案都不切实际&#…

MySQL的回表

核心问题 什么是回表? 答: 回表是一个过程,是获取到主键后再通过主键去查询数据的一个过程就叫回表。 那这个主键从哪来? 从叶子结点存储的内容来,如果存储的是非聚簇索引则通过叶子节点存储的值获取,该值…

机器学习笔记之生成对抗网络(一)逻辑介绍

机器学习笔记之生成对抗网络——逻辑介绍引言生成对抗网络——示例生成对抗网络——数学语言描述生成对抗网络——判别过程描述引言 本节将介绍生成对抗网络的基本逻辑与数学语言描述。 生成对抗网络——示例 生成对抗网络(Generative Adversarial Networks,GAN)&#xff0c…

视觉 → 检测提取

目标检测任务非常有趣且具有挑战性。有些任务非常复杂,需要更多数据才能有所产出。但在这篇文章中,我将展示一个符号检测的小任务,它可以用更少的数据完成。该项目的目的是使用计算机视觉技术从一组给定的图像中提取文本并检测各种符号。在这…

UniApp已经接了手机数据线,但运行工具警告 “没有检查到设备“ (华为手机为例 进行解决)

大部分第一次使用uni进行手机调试都会遇到这个问题 首先 将手机的数据线插入电脑的usb接口是必备前提 然后 就是手机的权限拦截了设备扫描 这就是uni工具找不到设备的原因 接入手机线后 数据会弹出一个USB的提示 点进去之后 我们要设置 允许传输文件 千万别仅充电 接下来的…

Java 以数据流的形式发送数据request Java 数据封装到request中

Java 以数据流的形式发送数据request Java 数据封装到request中 一、描述 1、在做微信支付结果通知的时候,看到一个描述:微信会把相关支付结果及用户信息通过数据流的形式发送给商户 ,那么java如何通过数据流的形式发送数据呢? 二…