大数据编程实验四:SparkStreaming编程

news2024/12/25 14:23:09

大数据编程实验四:SparkStreaming编程

文章目录

  • 大数据编程实验四:SparkStreaming编程
    • 一、实验目的与要求
    • 二、实验内容
    • 三、实验步骤
      • 1、利用Spark Streaming对不同类型数据源的数据进行处理
      • 2、完成DStream的两种有状态转换操作
      • 3、完成把DStream的数据输出保存到MySQL数据库中

一、实验目的与要求

  1. 通过实验掌握Spark Streaming的基本编程方法
  2. 熟悉利用Spark Streaming处理来自不同数据源的数据
  3. 熟悉DStream的各种转换操作
  4. 熟悉把DStream的数据输出保存到文本文件或MySQL数据库中

二、实验内容

  1. 参照教材示例,利用Spark Streaming对不同类型数据源的数据进行处理
  2. 参照教材示例,完成DStream的两种有状态转换操作
  3. 参照教材示例,完成把DStream的数据输出保存到文本文件或MySQL数据库中

三、实验步骤

1、利用Spark Streaming对不同类型数据源的数据进行处理

  • 文件流

    首先在虚拟机中打开第一个终端作为数据流终端,创建一个logfile目录:

    cd /usr/local/spark/mycode
    mkdir streaming
    cd streaming
    mkdir logfile
    

    然后我们打开第二个终端作为流计算终端,在我们创建的目录下面新建一个py程序:

    vim FileStreaming.py
    

    然后输入如下代码:

    from pyspark import SparkContext, SparkConf
    from pyspark.streaming import StreamingContext
    conf = SparkConf()
    conf.setAppName('TestDStream')
    conf.setMaster('local[2]')
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 10)
    lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
    words = lines.flatMap(lambda line: line.split(' '))
    wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
    wordCounts.pprint()
    ssc.start()
    ssc.awaitTermination()
    

    在这里插入图片描述

    保存该文件并执行如下命令:

    /usr/local/spark/bin/spark-submit FileStreaming.py
    

    然后我们进入数据流终端,在logfile目录下新建一个log2.txt文件,然后往里面输入一些英文语句后保存退出,再次切换到流计算终端,就可以看见打印出单词统计信息了。

    在这里插入图片描述

  • 套接字流

    我们继续在流计算端的streaming目录下创建一个socket目录,然后在该目录下创建一个DataSourceSocket.py程序:

    mkdir socket
    cd socket
    vim NetworkWordCount.py
    

    并在py程序中输入如下代码:

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
            exit(-1)
        sc = SparkContext(appName="PythonStreamingNetworkWordCount")
        ssc = StreamingContext(sc, 1)
        lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
        counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      .reduceByKey(lambda a, b: a+b)
        counts.pprint()
        ssc.start()
        ssc.awaitTermination()
    

    在这里插入图片描述

    我们再在数据流终端启动Socket服务器端:

    nc -lk 8888
    

    然后我们再进入流计算终端,执行如下代码启动流计算:

    /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 8888
    

    然后我们在数据流终端内手动输入一行英文句子后回车,多输入几次,流计算终端就会不断执行词频统计并打印出信息。

    在这里插入图片描述

  • RDD队列流

    我们继续在streaming目录下新建rddqueue目录并在该目录下创建py程序:

    mkdir rddqueue
    cd rddqueue/
    vim RDDQueueStreaming.py
    

    然后在py文件中输入如下代码:

    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
        sc = SparkContext(appName="PythonStreamingQueueStream")
        ssc = StreamingContext(sc, 2)
        #创建一个队列,通过该队列可以把RDD推给一个RDD队列流
        rddQueue = []
        for i in range(5):
            rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
            time.sleep(1)
        #创建一个RDD队列流
        inputStream = ssc.queueStream(rddQueue)
        mappedStream = inputStream.map(lambda x: (x % 10, 1))
        reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
        reducedStream.pprint()
        ssc.start()
        ssc.stop(stopSparkContext=True, stopGraceFully=True)
    

    在这里插入图片描述

    保存退出后再执行如下命令:

    /usr/local/spark/bin/spark-submit RDDQueueStreaming.py
    

    在这里插入图片描述

2、完成DStream的两种有状态转换操作

  • DStream无状态转换操作

    上面的词频统计程序NetworkWordCount就采取了无状态转换操作。

  • DStream有状态转换操作

    我们在socket目录下创建WindowedNetworkWordCount.py程序并输入如下代码:

    from __future__ import print_function
    import sys
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)
            exit(-1)
        sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")
        ssc = StreamingContext(sc, 10)
        ssc.checkpoint("file:///usr/local/spark/mycode/streaming/socket/checkpoint")
        lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
        counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      . reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
        counts.pprint()
        ssc.start()
        ssc.awaitTermination()
    

    在这里插入图片描述

    然后我们在数据流终端执行如下命令启动服务器:

    cd /usr/local/spark/mycode/streaming/socket/
    nc -lk 6666
    

    然后再在流计算终端运行我们刚写的代码:

    /usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 6666
    

    在数据流终端输入英文就可以看见统计结果了。

    在这里插入图片描述

3、完成把DStream的数据输出保存到MySQL数据库中

我们首先启动MySQL数据库:

systemctl start mysqld.service
mysql -u root -p

然后创建spark数据库和wordcount表:

mysql> create database spark;
mysql> use spark;
mysql> create table wordcount (word char(20), count int(4));

然后再在终端安装python连接MySQL的模块:

pip3 install PyMySQL

然后我们在streaming目录下新建stateful目录并在该目录下创建py文件:

mkdir stateful
cd stateful/
vim NetworkWordCountStatefulDB.py

并在py文件中输入如下代码:

from __future__ import print_function 
import sys 
import pymysql 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext 
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: NetworkWordCountStateful <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful") 
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) 
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0) 
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
                          .map(lambda word: (word, 1))\
                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD) 
    running_counts.pprint() 
    def dbfunc(records):
        db = pymysql.connect("localhost","root","123456","spark")
        cursor = db.cursor() 
        def doinsert(p):
            sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()
        for item in records:
            doinsert(item) 
    def func(rdd):
        repartitionedRDD = rdd.repartition(3)
        repartitionedRDD.foreachPartition(dbfunc)
    running_counts.foreachRDD(func)
    ssc.start()
    ssc.awaitTermination()

在这里插入图片描述

然后我们新建一个数据源终端并执行如下命令:

cd /usr/local/spark/mycode/streaming/stateful/
nc -lk 5555

然后再在我们的流计算终端运行我们该编写的代码:

/usr/local/spark/bin/spark-submit NetworkWordCountStatefulDB.py localhost 5555

然后就可以把词频统计的结果写入MySQL中了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/54675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

推荐一个对pytorch代码详细注释的github项目

今天在无意间找一个pytorch代码和注释的Github项目。 先上项目&#xff1a; https://github.com/labmlai/annotated_deep_learning_paper_implementations 这个项目还有个网站&#xff0c;地址&#xff1a;https://nn.labml.ai/ 这个项目将论文和pytorch代码结合起来&#xff…

jsp源码商城系统Myeclipse开发mysql数据库servlet开发java编程计算机网页项目

一、源码特点 JSP 源码商城系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统采用serlvetdaobean mvc 模式&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发…

想学习如何把excel图片转表格?1分钟教会你图片转表格怎么转

应该有不少小伙伴接收过上司或领导以图片格式发送过来的excel表格吧&#xff1f;并且还会要求我们将里面的内容整理为电子档&#xff0c;便于后期的内容编辑以及数据修改。 而当你们收到这种任务时&#xff0c;是怎么去操作的呢&#xff1f;是不是大部分人会选择手动重新制作&a…

【数据可视化】第四章—— 基于pandas的数据可视化(pandas数据结构)

文章目录前言1. Pandas库的引用2. Pandas库的数据类型2.1 Series类型2.2 Series创建方式2.3 Series类型的基本操作2.3.1 Series类型的切片和索引2.3.2 Series类型的对齐操作2.3.3 Series类型的name属性2.3.4 Series类型的修改2.4 DataFrame类型2.5 DataFrame类型创建2.6 DataFr…

毕设选题推荐基于python的django框架医院预约挂号系统

精彩专栏推荐订阅&#xff1a;在 下方专栏&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f496;&#x1f525;作者主页&#xff1a;计算机毕设老哥&#x1f525; &#x1f496; Java实战项目专栏 Python实…

公网SSH远程连接内网Ubuntu主机【cpolar内网穿透】

SSH为建立在应用层基础上的安全协议&#xff0c;专为远程登录会话和其他网络服务提供安全性的协议。但在没有公网IP的环境下&#xff0c;只能在同个局域网下远程。 本篇教程主要实现通过内网穿透&#xff0c;在公网环境下SSH远程登录内网的Ubuntu主机&#xff0c;突破局域网的…

manjaro gnome 记录 3 配置国内镜像源

manjaro gnome 记录 3 配置国内镜像源初manjaro 记录 3 配置国内镜像源更改这个文件设置源初 希望能写一些简单的教程和案例分享给需要的人 manjaro 记录 3 配置国内镜像源 打开图像界面的软件管理&#xff0c;点击右上角&#xff1a;三个点的图标 点击首选项 输入管理员密…

Day17-购物车页面-收获地址-初步封装my-address组件

1.创建收货地址组件&#xff08;my-address&#xff09; 我的操作&#xff1a; 1>在uni_modules文件夹右键新建一个组件 2>还需要自己补全代码 1>和2>的阶段效果图&#xff1a; my-address组件已经被渲染成功了。 *********************************************…

一文看懂linux 内核网络中 RPS/RFS 原理

1 自带 irqbalance 瓶颈 基于简单的中断负载均衡(如系统自带的irqbalance进程)可能会弄巧成拙。因为其并不识别网络流&#xff0c;只识别到这是一个数据包&#xff0c;不能识别到数据包的元组信息。 在多处理器系统的每个处理器都有单独的硬件高速缓存&#xff0c;如果其中一…

黑马程序员软件测试实战项目

Ego微商 “Ego微商”微信小程序应用&#xff0c;主要针对于有特色的食品类商品线上零售。通过微信平台的大流量入口&#xff0c;在一定程度上升高了特色食品的影响力&#xff0c;同时借助微信的模板消息快速推送更新的商品&#xff0c;实现轻量级应用的C2C或者是B2C的线上销售…

《论文阅读》BALM: Bundle Adjustment for Lidar Mapping

留个笔记自用 BALM: Bundle Adjustment for Lidar Mapping 做什么 首先是最基础的&#xff0c;Structure-from-Motion&#xff08;SFM&#xff09;&#xff0c;SFM可以简单翻译成运动估计&#xff0c;是一种基于dui8序列图片进行三维重建的算法。简单来说就是是从运动中不同…

12月2日第壹简报,星期五,农历十一月初九

12月2日第壹简报&#xff0c;星期五&#xff0c;农历十一月初九1. 银保监会&#xff1a;2023年1月起在北京、上海、江苏、浙江、福建、广东等10个省市开展商业养老金业务试点。2. 国家首批未来产业科技园试点名单出炉&#xff1a;空天科技未来产业科技园、未来能源与智能机器人…

2022-12-02 编译Android平台OpenCV,用到读取视频时报错:AMediaXXX

文章目录编译Android平台OpenCV&#xff0c;用到读取视频时报错&#xff1a;解决参考编译Android平台OpenCV&#xff0c;用到读取视频时报错&#xff1a; ld: error: undefined symbol: AMediaExtractor_new ld: error: undefined symbol: AMediaExtractor_setDataSourceFd ld…

PyQt5的安装

0. 准备工作 Anaconda3-5.2.0-Windows-x86_64pycharm-professional-2018.2.4PyQt5 5.8.1 1. 如何正确安装PyQt5&#xff1f; 1.1 安装PyQt5 pip install PyQt5 -i https://pypi.douban.com/simple- i表示指定安装源&#xff0c;表示国内源 https://pypi.douban.com/simple …

创建一个SpringCloud项目

文章目录1.首先在**SpringCloud官网**中查看依赖版本号2.创建主Maven项目&#xff1a;在pom文件中引入依赖3.再在这个Maven项目中创建子模块&#xff08;子模块也是Maven&#xff09;(1)创建一个数据库db01和表dept(2)创建实体类dept&#xff08;注意&#xff1a;**每个实体类都…

导包问题解决--ImportError: DLL load failed while importing _path: 找不到指定的模块

一、问题反馈 在运行某个Python程序时&#xff0c;需要导入numpy和matplotlib包如下&#xff1a; import numpy as np import matplotlib.pyplot as plt运行程序时会报错“ImportError: DLL load failed while importing _path: 找不到指定的模块”&#xff1a; 二、问题解决…

信号发生器的电路构成及工作原理

一、信号发生器的电路构成 信号发生器的电路组成有多种形式&#xff0c;一般包括以下几个环节: 基本波形产生电路:波形产生可以由RC振荡器、文丘里电桥振荡器或压控振荡器产生。 波形转换电路:基本波形由正弦波、方波、三角波经过矩形波整形电路、正弦波整形电路、三角波整形电…

经众多Nature文章使用认证!艾美捷抗酒石酸酸性磷酸酶TRAP染色试剂盒

抗酒石酸酸性磷酸酶&#xff08;TRAP&#xff0c;tartrate-resistant acid phosphatase&#xff09;为破骨细胞的标志酶&#xff0c;特异地分布于破骨细胞中&#xff0c;为破骨细胞所特有。通常作为鉴别破骨细胞的重要标志物&#xff0c;使破骨细胞呈红色。Kamiya艾美捷抗酒石酸…

Java单表实现评论回复功能

Java单表实现评论回复功能1.简介2.功能实现图3.数据库设计4.实体类5.实现思路6.功能实现6.1 Sql入手6.2 业务实现7.前端实现8.最终成果1.简介 最近在写毕业设计的时候发现需要实现一个评论功能&#xff0c;然后看了一下掘金和csdn的评论区&#xff0c;如何实现评论功能&#xf…

【已解决】nginx x-cache: MISS

nginx x-cache: MISS 今天在使用nginx的时候发生了巨无语的一件事&#xff0c;明明我已经配置了代理缓存proxy_cache&#xff0c;但是一直未生效&#xff0c;于是我不断进行排错、nginx -s reload&#xff0c;问题始终没有解决。后来我尝试在另一台服务器上使用相同的配置&…