PySpark3.4.4_基于StreamingContext实现网络字节流统计分析

news2025/1/9 2:08:21

网络字节流与嵌套字节流的区别

  1. 概念解释

    • 网络嵌套字节流
      • 在网络编程的情境下,网络嵌套字节流通常是指将字节流(字节序列)以一种分层或者包含的方式进行组织,用于在网络传输过程中更好地处理数据。例如,在一个复杂的网络协议栈中,高层协议的数据单元(往往也是字节流形式)可以嵌套在底层协议的字节流之中。这就好比包裹的嵌套,外层包裹可能包含了内层包裹的相关信息以及内层包裹本身。以 HTTP 协议在 TCP/IP 协议之上传输为例,HTTP 消息(本身是字节流)被嵌套在 TCP 的字节流中进行传输。TCP 协议负责将 HTTP 消息切割成合适的片段(字节流形式),加上 TCP 头信息(也是字节流),然后通过网络发送。接收端的 TCP 协议先处理接收到的字节流,提取出 HTTP 消息的字节流部分,再交给上层的 HTTP 协议处理。
    • 套字节流
      • 这个概念不是很常见,如果理解为 “包裹字节流” 的意思,和网络嵌套字节流有相似之处。不过,“套字节流” 可能更强调简单的封装形式,即将一个字节流作为另一个字节流的一部分进行简单包装。比如,在加密通信中,原始的字节流(如要传输的文件内容字节流)被加密算法处理后,会生成一个新的字节流,这个新字节流可以看作是原始字节流被 “套” 上了一层加密后的字节流。它可能没有像网络嵌套字节流那样涉及复杂的网络协议层次关系。
  2. 应用场景区别

    • 网络嵌套字节流
      • 广泛应用于网络通信的各个层次。在构建网络服务器和客户端应用时,不同层次的网络协议交互都涉及网络嵌套字节流。例如,在电子邮件传输(SMTP、POP3 等协议)中,邮件内容字节流被嵌套在相应的协议字节流中在网络上传输。它主要用于保证数据在不同网络环境和协议间的正确传递和解析,确保数据能够从源端的应用层通过层层协议封装,经过网络传输,最终在目的端的应用层被正确还原。
    • 套字节流
      • 更多地用于数据安全和简单的数据封装场景。如在数字签名的应用中,消息的字节流被 “套” 上签名信息的字节流,用于验证消息的来源和完整性。或者在数据存储中,为了区分不同类型的数据,将数据字节流 “套” 上一个标识头字节流进行存储,方便后续读取和分类处理。
  3. 处理方式区别

    • 网络嵌套字节流
      • 需要严格按照网络协议栈的规则进行处理。在发送端,数据从高层协议开始,一层一层地进行字节流的嵌套和封装,添加每层协议所需的头部、尾部等信息。在接收端,则是相反的过程,从最外层的协议字节流开始,逐步解包和解析,根据每层协议的规范提取出内层协议的字节流,直到最终得到应用层的数据字节流。这需要对各种网络协议的格式、功能和交互流程有深入的了解。
    • 套字节流
      • 处理相对简单,主要关注封装和提取两个操作。在封装时,根据具体的需求添加包裹字节流(如加密后的字节流添加到原始字节流外层)。在提取时,按照预先定义的规则(如加密算法对应的解密规则、数据标识头的解析规则等)去除外层字节流,获取内部的原始字节流或者所需的数据。

PySpark代码开发

需要在ubuntu环境下或windows环境下,提前安装好spark执行环境

软件说明:

  1. spark 3.4.4
  2. python 3.9.20
  3. java jdk1.8.0_431

代码说明

DataSourceSoket.py 用于模拟生成实时字节流数据的脚本

# coding:utf8
import random
from socket import socket

server = socket()

server.bind(('localhost', 9999))
server.listen(1)
while True:
    # 为了方便识别,输出一个"I'm waiting the connect ..."
    print("I'm waiting the connect ...")
    conn, addr = server.accept()
    print("Connected by {0}".format(addr))
    print(f"Connected by {addr}")
    # 输出发送数据
    # 自定义10条中文数据在一个数据容器里,并随机选取一条中文数据集输出
    # 步骤1:创建一个列表作为数据容器
    data_container = []

    # 步骤2:向列表中添加10条不同的中文数据
    chinese_data = [
        "你好,世界",
        "今天天气真好",
        "学习是一件快乐的事",
        "分享知识,传递快乐",
        "探索未知的世界",
        "坚持就是胜利",
        "努力不懈,梦想终会实现",
        "失败乃成功之母",
        "平凡造就非凡",
        "相信自己,你是最棒的",
        "I like Spark",
        "I like Flink",
        "I like Hadoop"
    ]

    data_container.extend(chinese_data)

    # 步骤3:使用random.choice()随机选择并输出一条数据
    random_item = random.choice(data_container)
    print(random_item)
    conn.sendall(random_item.encode())
    conn.close()
    print("Connection closed")

pysparkStreamingNetwordCountCN.py  SparkStreaming处理实时数据流

# coding:utf8

from __future__ import print_function

import os
import sys
import jieba
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 设置环境变量,确保指向正确的 Java 解释器
os.environ['JAVA_HOME'] = '/opt/HadoopEco/jdk1.8.0_431'  # 替换为你的 JDK 8 安装路径
os.environ['SPARK_HOME'] = '/opt/HadoopEco/spark-3.4.4-bin-without-hadoop'

# 加载停用词表
def load_stopwords(file_path):
    """
    从指定文件或文件夹中加载停用词列表。

    参数:
    file_path (str): 停用词文件或文件夹的路径。

    返回:
    set: 包含停用词的集合。
    """
    stopwords = set()
    try:
        if os.path.isfile(file_path):
            with open(file_path, 'r', encoding='utf-8') as f:
                stopwords.update(line.strip() for line in f)
        elif os.path.isdir(file_path):
            for filename in os.listdir(file_path):
                file_full_path = os.path.join(file_path, filename)
                if os.path.isfile(file_full_path):
                    with open(file_full_path, 'r', encoding='utf-8') as f:
                        stopwords.update(line.strip() for line in f)
        else:
            print(f"Error: The path {file_path} is neither a file nor a directory.")
    except FileNotFoundError:
        print(f"Error: The file or directory {file_path} does not exist.")
    except PermissionError:
        print(f"Error: Permission denied for the file or directory {file_path}.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    return stopwords

# 替换为你的停用词表路径或文件夹路径
stopwords = load_stopwords(sys.argv[3])  # 或 'path/to/stopwords_folder'

def sparkstreamingnetworkcount():
    global sc, ssc, lines
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 10)
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

    def split_words(line):
        try:
            # 使用 jieba 进行中文分词
            chinese_words = jieba.lcut(line.strip())
            # 使用空格进行英文分词
            english_words = line.strip().split(" ")
            # 合并分词结果并过滤掉空字符串
            words = set(chinese_words + english_words) - {''}
            # 过滤掉停用词
            filtered_words = [word.lower() for word in words if word not in stopwords]
            return filtered_words
        except Exception as e:
            print(f"Error processing line: {line}, Error: {e}", file=sys.stderr)
            return []

    counts = lines.flatMap(split_words).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: networkcount.py <hostname> <port> <stopwords>", file=sys.stderr)
        exit(-1)

    sparkstreamingnetworkcount()

运行时的运行参数配置

运行结果如下

DataSourceSoket.py

pysparkStreamingNetwordCountCN.py 运行结果

注意事项:

1. 需要先启动 DataSourceSocket.py, 在启动 pysparkStreamingNetwordCountCN.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2257255.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Homework】【8】Learning resources for DQ Robotics in MATLAB

作业任务 创建一个名为“VS050RobotDH”的类&#xff0c;该类代表Denso VS050机器人&#xff0c;其DH参数如下表所示&#xff0c;并且完全由旋转关节组成。&#xff08;请记住第6课的内容&#xff09; θ \theta θ d d d a a a α \alpha α − π -\pi −π0.3450 π 2 \fra…

如何防御ARP欺骗 保护IP安全

在数字化浪潮席卷全球的今天&#xff0c;网络安全威胁如同暗流涌动&#xff0c;时刻考验着我们的防范能力。其中&#xff0c;ARP欺骗攻击作为一种隐蔽性强、成本低廉且危害严重的网络攻击手段&#xff0c;成为众多网络安全事件中的一颗“毒瘤”。那么我们究竟是如何防御ARP欺骗…

低代码场景案例配置——复杂数据模型下表单与表格关联字段的保存

主子表的场景是每个业务系统都绕不过的功能点&#xff0c;低代码能不能在业务上用的起来&#xff0c;这个是必须过的门槛。那么什么主子表有哪些场景的应用&#xff0c;如何配置呢&#xff0c;接下来我们就举个例详细说明 订单管理系统&#xff0c;场景描述&#xff1a; 在电…

方案拆解 | 打击矩阵新规频出!2025矩阵营销该怎么玩?

社媒平台的矩阵营销又要“变天”了&#xff1f;&#xff01; 11月18日&#xff0c;小红书官方发表了被安全薯 称为“小红书史上最严打击黑灰产专项”新规&#xff0c;其中就包括黑灰产矩阵号的公告。 ▲ 图源&#xff1a;小红书 实际上&#xff0c;不包括这次&#xff0c;今年…

C51小车项目-笔记11-SU-03T语音控制模块

一、网页配置 网站&#xff1a;智能公元/AI产品零代码平台 配置步骤&#xff1a; 发布版本&#xff0c;输入版本名字 等待SDK生成成功 成功之后下载SDK&#xff0c;完成之后将压缩包放到一个没有中文的文件目录中解压 二、接线 三、操作步骤 解压&#xff0c;以管理员身份打…

Springboot3介绍

一、Springboot3简介: https://docs.spring.io/spring-boot/docs/current/reference/html/getting-started.html?spmwolai.workspace.0.0.68b62306Q6jtTw#getting-started.introducing-spring-boot 无论使用XML、注解、Java配置类还是他们的混合用法&#xff0c;配置文件过于…

Mac上基于pyenv管理Python多版本的最佳实践

首先声明&#xff0c;你可以选择使用 Homebrew 来安装pyenv。我这里主要是想和我 Linux 设备上一致&#xff0c;所以选择使用脚本来安装pyenv。 准备安装脚本 这个安装的脚本来源于官方的的github仓库。 关于安装脚本的解读请看《pyenv 安装脚本解读》。 pyenv-installer.sh …

生成:安卓证书uniapp

地址&#xff1a; https://ask.dcloud.net.cn/article/35777 // 使用keytool -genkey命令生成证书&#xff1a; 官网&#xff1a; keytool -genkey -alias testalias -keyalg RSA -keysize 2048 -validity 36500 -keystore test.keystore ----------------------------------…

SpringBoot基于Redis+WebSocket 实现账号单设备登录.

引言 在现代应用中&#xff0c;一个账号在多个设备上的同时登录可能带来安全隐患。为了解决这个问题&#xff0c;许多应用实现了单设备登录&#xff0c;确保同一个用户只能在一个设备上登录。当用户在新的设备上登录时&#xff0c;旧设备会被强制下线。 本文将介绍如何使用 Spr…

【MySQL 进阶之路】事务并发情况分析

MySQL事务并发控制分析笔记 在数据库系统中&#xff0c;事务并发控制至关重要&#xff0c;能够确保多个事务并发执行时的数据一致性、隔离性和正确性。MySQL通过不同的锁机制控制并发操作&#xff0c;以确保事务的隔离性。以下是对事务A和事务B并发行为的详细分析&#xff0c;…

如何在小米平板5上运行 deepin 23 ?

deepin 23 加入了 ARM64 支持&#xff0c;这里尝试将 deepin 系统刷入平板中&#xff0c;平常使用中&#xff0c;带个笔记本电脑有时候也会嫌比较麻烦&#xff0c;把 Linux 系统刷入平板中既满足了使用需要&#xff0c;又满足了轻便的需求。为什么不使用 Termux &#xff1f;虽…

华为HarmonyOS 快速构建各种文本识别应用 -- 通用文字识别

适用场景 通用文字识别&#xff0c;是通过拍照、扫描等光学输入方式&#xff0c;将各种票据、卡证、表格、报刊、书籍等印刷品文字转化为图像信息&#xff0c;再利用文字识别技术将图像信息转化为计算机等设备可以使用的字符信息的技术。 可以对文档翻拍、街景翻拍等图片进行…

【系统架构核心服务设计】使用 Redis ZSET 实现排行榜服务

目录 一、排行榜的应用场景 二、排行榜技术的特点 三、使用Redis ZSET实现排行榜 3.1 引入依赖 3.2 配置Redis连接 3.3 创建实体类&#xff08;可选&#xff09; 3.4 编写 Redis 操作服务层 3.5 编写控制器层 3.6 测试 3.6.1 测试 addMovieScore 接口 3.6.2 测试 g…

【Docker】如何在Docker中配置防火墙规则?

Docker本身并不直接管理防火墙规则&#xff1b;它依赖于主机系统的防火墙设置。不过&#xff0c;Docker在启动容器时会自动配置一些iptables规则来管理容器网络流量。如果你需要更细粒度地控制进出容器的流量&#xff0c;你需要在主机系统上配置防火墙规则。以下是如何在Linux主…

java+ssm+mysql美妆论坛

项目介绍&#xff1a; 使用javassmmysql开发的美妆论坛&#xff0c;系统包含超级管理员&#xff0c;系统管理员、用户角色&#xff0c;功能如下&#xff1a; 用户&#xff1a;主要是前台功能使用&#xff0c;包括注册、登录&#xff1b;查看论坛板块和板块下帖子&#xff1b;…

【MFC】vs2019中使用sqlite3完成学生管理系统

目录 效果图list Contral 控件的简单使用使用sqlite3 效果图 使用sqlite3完成简单的数据库操作。 list Contral 控件的简单使用 本章只介绍基本应用 添加表头&#xff1a;语法&#xff1a; int InsertColumn(int nCol, LPCTSTR lpszColumnHeading, int nFormat LVCFMT_LEFT…

Java设计模式 —— 【创建型模式】建造者模式详解

文章目录 一、建造者模式二、案例实现三、优缺点四、模式拓展五、对比1、工厂方法模式VS建造者模式2、抽象工厂模式VS建造者模式 一、建造者模式 建造者模式&#xff08;Builder Pattern&#xff09; 又叫生成器模式&#xff0c;是一种对象构建模式。它可以将复杂对象的建造过…

单链表(C语言版本)

前提 不探讨头结点空链表可以插入和查找&#xff0c;不可删除一般不选择phead移动&#xff0c;定义一个新结点把phead赋给他&#xff0c;移动新结点即可单链表不适合在前面和后面插入或删除&#xff0c;适合在后面插入删除 头插 void SLPushFront(SLTNode** pphead, SLTDataTy…

VMware虚拟机搭建和镜像配置

VMware虚拟机搭建和镜像配置 下载安装VMware 开始下载 更改安装路径&#xff0c;需要一个大空间的盘 更改后下一步 下一步后&#xff0c;选择不主动升级更新 一直下一步 直到安装完毕 输入许可密钥&#xff0c;我下载的版本是12&#xff0c;输入完成点击输入&#xff…

使用PPT科研绘图导出PDF边缘留白问题解决方案

使用PPT画图导出PDF格式后&#xff0c;边缘有空白&#xff0c;插入latex不美观&#xff0c;解决方案为自定义PPT幻灯片母版大小&#xff0c;如题步骤为&#xff1a; 1、查看已制作好的图片的大小&#xff0c;即长度和宽度 2、选择自定义幻灯片大小 3、自定义幻灯片大小为第1…