spark第八章:Pyspark

news2024/11/24 7:20:09

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作
spark第五章:SparkSQL实例
spark第六章:SparkStreaming基本操作
spark第七章:SparkStreaming实例
spark第八章:Pyspark


文章目录

  • 系列文章目录
  • 前言
  • 一、环境准备
    • 1.安装Python3
    • 2.安装Pyspark
    • 3.测试环境
    • 4.提交py任务
  • 二、编写代码
    • 1.安装库
    • 2.core操作
    • 3.SQL操作
    • 4.Streaming操作
    • 在这里插入图片描述
  • 总结


前言

之前我们用scala完成了spark内容的学习,现在我们用Pyspark来进行spark集群操作.


一、环境准备

1.安装Python3

用Pyspark编写文件,自然要用Python的环境,centos7中可以用以下命令安装.

yum install python3

在这里插入图片描述
pyspark建议使用Python3.7及以上版本,但是centos7的默认源里边只有3.6,不过做最基本的练习还是够了.

2.安装Pyspark

用pip命令安装.

pip install pyspark==3.2.3

建议版本和自己的spark版本一致

3.测试环境

我这里用的是本地环境测试,没有打开全部集群.
在这里插入图片描述
在这里插入图片描述
这里就可以写代码了,但是咱们就不再这里写了.

4.提交py任务

这里我们用官方给的案例进行测试,如果没有问题,我们就可以开始学习了.

./bin/spark-submit examples/src/main/python/pi.py 10

在这里插入图片描述

二、编写代码

1.安装库

由于咱们在本地编写和测试文件,所以本地也需要Pyspark,但是不需要spark环境.

pip install pyspark==3.2.3

在这里插入图片描述
一个全新的虚拟环境

2.core操作

01_WC.py

from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    # 1.创建sparkContext对象
    conf = SparkConf().setMaster("local[*]").setAppName("WC")
    sc = SparkContext(conf=conf)

    # 2.读取文本数据
    rdd_init = sc.textFile("/home/atguigu/bigdatas/datas/wc.txt")

    # 3.数据切割
    flat_map = rdd_init.flatMap(lambda line: line.split(" "))

    # 4.数据格式转化
    rdd_map = flat_map.map(lambda word: (word, 1))

    # 5数据分组求和
    res = rdd_map.reduceByKey(lambda a,b:a+b)

    # 6.打印输出
    print(res.collect())

    # 7.关闭输出
    sc.stop()

在这里插入图片描述
从HDFS中读取文件.

from pyspark import SparkContext, SparkConf
import os

# 模拟集群用户
os.environ["HADOOP_USER_NAME"] = "atguigu"

if __name__ == '__main__':
    # 1.创建sparkContext对象
    conf = SparkConf().setMaster("local[*]").setAppName("WC")
    sc = SparkContext(conf=conf)

    # 2.读取文本数据
    rdd_init = sc.textFile("hdfs://192.168.10.102/wc.txt")

    # 3.数据切割
    flat_map = rdd_init.flatMap(lambda line: line.split(" "))

    # 4.数据格式转化
    rdd_map = flat_map.map(lambda word: (word, 1))

    # 5数据分组求和
    res = rdd_map.reduceByKey(lambda a, b: a + b)

    # 6.输出到HDFS
    res.saveAsTextFile("hdfs://192.168.10.102/output1")

    # 7.关闭输出
    sc.stop()

在这里插入图片描述

3.SQL操作

01_WC.py

from pyspark import SparkContext, SparkConf
import os

from pyspark.sql import SparkSession

os.environ["HADOOP_USER_NAME"] = "atguigu"

if __name__ == '__main__':
    # SparkSQL对象创建
    spark = SparkSession.builder.master("local[*]").appName("WC").getOrCreate()

    df = spark.read.format("text").load("datas/wc.txt")

    df.createTempView("t1")

    spark.sql("""
        select * from t1
    """).show()

在这里插入图片描述
成功打印表,可以使用.

02_udf.py
自定义函数

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

os.environ["HADOOP_USER_NAME"] = "atguigu"

if __name__ == '__main__':
    spark = SparkSession.builder.master("local[*]").appName("WC").getOrCreate()

    df_init = spark.createDataFrame([(1, "张三", "北京"), (2, "李四", "上海"), (3, "王五", "深圳"), ],
                                    schema="id integer,name String,address String")

    df_init.createTempView("t1")


    # 自定义函数
    def strJoin(str):
        return "name is " + str


    # 注册自定义函数
    strJoin_2 = spark.udf.register("strJoin_1", strJoin, StringType())



    # 使用函数
    # 方式1
    # SQL使用
    # spark.sql("""
    #     select id,strJoin_1(name),address from t1
    # """).show()

    # DSL使用
    # df_init.select(df_init["id"],strJoin_2(df_init["name"]),df_init["address"]).show()

    # 方式2
    strJoin_3 = F.udf(strJoin, StringType())
    # 简写
    @F.udf(returnType=StringType())
    def strJoin_udf(str):
        return "name is " + str

    # DSL使用
    # df_init.select(df_init["id"],strJoin_3(df_init["name"]),df_init["address"]).show()

    df_init.select(df_init["id"],strJoin_udf(df_init["name"]),df_init["address"]).show()

在这里插入图片描述
02_udf1.py

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

os.environ["HADOOP_USER_NAME"] = "atguigu"

if __name__ == '__main__':
    spark = SparkSession.builder.master("local[*]").appName("WC").getOrCreate()

    df_init = spark.createDataFrame([(1, "张三 北京"), (2, "李四 上海"), (3, "王五 深圳"), ],
                                    schema="id integer,nameANDaddress String")

    df_init.createTempView("t1")


    def str_split_udf(nameANDaddress: str):
        arr = nameANDaddress.split(" ")
        return [arr[0], arr[1]]


    schema = StructType().add("nn", StringType()).add("ar", StringType())
    str_split_udf_D = spark.udf.register("str_split_udf", str_split_udf, returnType=schema)

    # spark.sql("select id,str_split_udf(nameANDaddress).nn ,str_split_udf(nameANDaddress).ar from t1").show()

    df_init.select("id",str_split_udf_D("nameANDaddress")["nn"]).show()

在这里插入图片描述
连接hive
这里要先修改一下hive-site.xml文件
在这里插入图片描述
增加远程连接接口,如果集群需要连接hive,将这个文件扔到spark的conf目录.
03_hive.py


from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = SparkSession.builder.appName("hive")\
        .config("hive.metastore.uris", "thrift://192.168.10.102:9083")\
        .config("spark.sql.warehouse.dir", "hdfs://192.168.10.102:8020/usr/hive/warehouse")\
        .enableHiveSupport()\
        .getOrCreate()

    spark.sql("show databases").show()

然后启动hive,运行程序.
在这里插入图片描述

4.Streaming操作

01_WC.py

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    # 1.创建sparkContext对象
    conf = SparkConf().setMaster("local[*]").setAppName("WC")
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 5)
    lines = ssc.socketTextStream("127.0.0.1", 9999)

    result = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    result.pprint()

    ssc.start()
    ssc.awaitTermination()

在这里插入图片描述

总结

由于咱们之前用scala做过很多案例,所以这利用Python就不写了,只完成最基本的操作即可.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/430005.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

带隙基准基本原理

目录 负温度系数电压 正温度系数电压 带隙基准 小结 如何产生一个不受温度变化,保持恒定的电压基准呢? 我们假设,如果将两个具有相反温度系数(TC)的量以适当的权重相加,那么结果就会成为零温度系数&am…

MIT6.824 lab2C2D实验总结

2C 就是持久化一些变量,日志,任期,投票给谁,2D(lastincludeterm, lastincludeindex, snapshot)。同时最难受的是Figure8Unreliable这个测试点,总是几百次出现一两个错误。最后发现是对论文一句话的歧义。这里讲解一下…

Linux(DHCP原理与配置)

文章目录一 、什么是DHCP1.1DHCP定义1.2DHCP好处1.3DHCP的分配方式二 、DHCP 的工作过程三 、DHCP中的设置3.1 DHCP参数3.2 相关操作步骤一 、什么是DHCP 1.1DHCP定义 DHCP(动态主机配置协议)是一个局域网的网络协议。指的是由服务器控制一段IP地址范围…

swagger文件上传接口没有选择文件按钮问题解决 使用@RequestPart注解

在使用文件上传API时,swagger没有选择文件按钮 在MultipartFile前面加上RequestPart注解 PostMapping("/importFile")ApiOperation(value "文件上传API")public Object importFile(ApiParam(value "文件流", required true) Reque…

超越ChatGpt,最近爆火的AutoGPT 究竟是什么

一、AutoGPT是什么 最近几天,一款基于GPT-4的最强语言模型AutoGPT火遍了整个AI圈。众所周知,此前爆火AI圈的ChatGPT,唯一不太好用的地方就是需要人类不断的prompt。因此,如果你想要ChatGPT帮你去做一件复杂的事情,那么…

第九章 法律责任与法律制裁

第九章 法律责任与法律制裁_副本 目录 第一节 法律责任的概念 一 法律责任的含义二 法律责任的特点 第二节 法律责任的分类与竞合 一 法律责任的分类 (一)根据责任行为所违反的法律的性质 民事责任:刑事责任行政责任违宪责任 (二…

【云原生 • Docker】cAdvisor+Prometheus+Grafana 10分钟搞定Docker容器监控平台

文章目录cAdvisorPrometheusGrafana 10分钟搞定Docker容器监控平台cAdvisor部署Prometheus部署Grafana部署cAdvisorPrometheusGrafana 10分钟搞定Docker容器监控平台 cAdvisor(Container Advisor) 是 Google 开源的一个容器监控工具,可用于对容器资源的使用情况和性…

企业级信息系统开发讲课笔记3.1 基于配置文件整合SSM框架实现用户登录

文章目录零、本节学习目标一、采用MVC架构二、用户登录运行效果三、基于XML配置方式整合SSM框架实现用户登录(一)创建数据库与表1、创建数据库2、创建用户表3、在用户表里插入记录(二)创建Maven项目(三)项目…

【手把手刷CCF】202303-2-垦田计划100分(超简单思路,含详细解释注释与代码)

文章目录:故事的开头总是极尽温柔,故事会一直温柔……💜一、🌳代码如下:二、🌵解题思路❤️❤️❤️忙碌的敲代码也不要忘了浪漫鸭!故事的开头总是极尽温柔,故事会一直温柔……&…

vector使用+模拟实现

目录 vector介绍 常见接口 构造函数 迭代器 容量操作 元素访问 增删查改 模拟实现 模拟实现要点图解 整体代码 迭代器失效问题 内部失效 外部失效 深浅拷贝问题 vector介绍 vector是表示可变大小数组的序列式容器。vector采用连续的空间存储元素,大小…

HTML5 <meta> 标签

HTML5 <meta> 标签 实例 描述 HTML 文档的元数据&#xff1a; <head> <meta name"description" content"免费在线教程"> <meta name"keywords" content"HTML,CSS,XML,JavaScript"> <meta name"auth…

全志v851s 在 eyesee-mpp 中添加一个hello_world sample 的流程

1. 为什么要在eyesee-mpp 中添加sample&#xff1f; 1&#xff09;保持整个openwrt 应用程序编写的完成性&#xff1b; 2&#xff09;eyesee-mpp 中包含了几乎所有全志视频音频模块的sample 以及 头文件&#xff0c;参考以及头文件调用起来非常方便&#xff0c;而且可以学习各种…

MongoDB 聚合管道中使用数组表达式运算符合并数组($concatArrays)

数组表达式运算符主要用于文档中数组的操作&#xff0c;接上一篇&#xff1a;MongoDB 聚合管道中使用数组表达式运算符&#xff08;$slice截取数组&#xff09;https://blog.csdn.net/m1729339749/article/details/130130328本篇我们主要介绍数组表达式运算符中用于合并数组的操…

InnoSetup制作安装包(EXE)

功能描述 1.666666.war为项目war包&#xff0c;666666.bat为启动war包脚本&#xff0c;通过InnoSetup将它们打包到安装包666666.exe 2.666666.exe安装包安装过程中将666666.bat注册为自启动服务&#xff0c;安装结束自动执行脚本启动项目666666.war --------------------------…

VxLAN数据中心L2/L3互通(端到端)

VxLAN数据中心端到端方式实现L2/L3互连&#xff0c;这种实现方式可以使数据中心属于同一个EVPN-VXLAN域&#xff0c;相较于hand-off方式通过端到端实现数据中心L2互连可以满足Mac mobility、ARP suppression等特性。 实现思路 DC1的Border-Leaf和DC2的Border-Leaf之间运行EBG…

测试:腾讯云轻量4核8G12M服务器CPU流量带宽系统盘

腾讯云轻量4核8G12M应用服务器带宽&#xff0c;12M公网带宽下载速度峰值可达1536KB/秒&#xff0c;折合1.5M/s&#xff0c;每月2000GB月流量&#xff0c;折合每天66GB&#xff0c;系统盘为180GB SSD盘&#xff0c;地域节点可选上海、广州或北京&#xff0c;4核8G服务器网来详细…

MySQL 日志

错误日志(error log): error log 主要记录 MySQL 在启动、关闭或者运行过程中的错误i西南西&#xff0c;在MySQL 的配置文件 my.cnf 中&#xff0c;可以通过 log-error/var/log/mysqld.log 执行 mysql 错误日志的位置 慢查询日志(slow query log): MySQL 的慢查询日志是 MyS…

【erlang】并发篇

PID类型 在之前的语法篇中&#xff0c;我们并没有介绍 PID这个类型&#xff0c;它和并发息息相关&#xff0c;因此我们在这里来学习它。 PID是进程标识符的意思&#xff0c;用来标识一个erlang进程。在所有相连的erlang节点中&#xff0c;PID都是唯一的。但是PID会被复用&…

从零搭建一个 Level-2 快照数据的因子计算平台

因子挖掘是量化交易的基础。近年来&#xff0c;Python 是很多研究员进行数据分析和因子挖掘的主流工具。但是通过 Python 挖掘的有效因子在投产时&#xff0c;通常需要由 QUANT 团队的研究员将代码提交给 IT 团队&#xff0c;IT 团队用 C 代码转写后部署到生产环境&#xff0c;…

1. HTMLCSS

文章目录1 盒子模型&#xff1a;1.1 盒子属性导图1.2 边框属性导图1.3 定位导图&#xff1a;2 HTML常用标签2.1 基本标签① HTML基本结构② HTML常见标签③ 特殊字符④ 列表标签a 无序列表&#xff1a;b 有序列表&#xff1a;⑤ 表单3 CSS快速上手3.1 background属性① 思维导图…