摸鱼大数据——Spark Structured Steaming——物联网数据分析案例

news2025/1/11 12:33:08

1、数据模拟器代码

  • 1- 创建一个topic, 放置后续物联网的数据 search-log-topic

 ./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic search-log-topic --partitions 3 --replication-factor 2
  • 2- 将代码放置到项目中:

import json
import random
import sys
import time
import os
from kafka import KafkaProducer
from kafka.errors import KafkaError

# 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"

# 快捷键:  main 回车
if __name__ == '__main__':
    print("模拟物联网数据")

    # 1- 构建一个kafka的生产者:
    producer = KafkaProducer(
        bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'],
        acks='all',
        value_serializer=lambda m: json.dumps(m).encode("utf-8")
    )

    # 2- 物联网设备类型
    deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]

    while True:
        index = random.choice(range(0, len(deviceTypes)))
        deviceID = f'device_{index}_{random.randrange(1, 20)}'  # 设备ID
        deviceType = deviceTypes[index]  # 设备类型
        deviceSignal = random.choice(range(10, 100)) # 设备信号

        # 组装数据集
        print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
               'time': time.strftime('%s')})

        # 发送数据
        producer.send(topic='search-log-topic',
                      value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
                                       'time': time.strftime('%s')}
        )

        # 间隔时间 5s内随机
        time.sleep(random.choice(range(1, 5)))

  • 测试, 观察是否可以正常生成:

 ./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic search-log-topic

2、需求说明

目前咱们有一个模拟器程序, 可以向Kafka不断的写入数据

要做的是, 用Spark的结构化流接收数据, 并且对数据进行统计分析操作:

  • 求: 各种信号强度>30各种类型的设备数量 和 它们的平均信号强度

需求分析:

1- 需要按照设备类型进行分组,也就是维度是设备类型deviceType

2- 指标

设备数量:deviceID

平均信号强度:deviceSignal

示例数据:

{'deviceID': 'device_1_1', 'deviceType': '油烟机', 'deviceSignal': 23, 'time': '1668848417'} {'deviceID': 'device_0_4', 'deviceType': '洗衣机', 'deviceSignal': 55, 'time': '1668848418'}

 deviceID: 设备ID
 deviceType: 设备类型
 deviceSignal: 设备信号
 time : 设备发送时间戳

3、代码实现

 from pyspark import SparkConf, SparkContext
 import os
 from pyspark.sql import SparkSession
 import pyspark.sql.functions as F
 ​
 # 绑定指定的Python解释器
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 ​
 def sql():
     # SQL
     # 3.2- 拆解数据结构。将json解析得到单个的字段
     """
         get_json_object(参数1,参数2):用来解析json串。一次只能得到一个字段的值
             参数1:要解析的json字段名称
             参数2:字段的解析路径 $.字段路径
     """
     etl_df = spark.sql("""
         select
             get_json_object(value,'$.deviceID') as deviceID,
             get_json_object(value,'$.deviceType') as deviceType,
             get_json_object(value,'$.deviceSignal') as deviceSignal,
             get_json_object(value,'$.time') as time
         from iot
     """)
     etl_df.createTempView("etl")
 ​
     # 3.3- 各种信号强度>30各种类型的设备数量  和  它们的平均信号强度
     result_df = spark.sql("""
         select
             deviceType,
             count(deviceID) as cnt_deviceID,
             round(avg(deviceSignal),2) as avg_deviceSignal
         from etl
         where deviceSignal>30
         group by deviceType
     """)
 ​
     # 4- 数据输出
     # 5- 启动流式任务
     result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
 ​
 ​
 def dsl():
     result_df = etl_tmp_df.select(
         F.get_json_object('value', '$.deviceID').alias('deviceID'),
         F.get_json_object('value', '$.deviceType').alias('deviceType'),
         F.get_json_object('value', '$.deviceSignal').alias('deviceSignal'),
         F.get_json_object('value', '$.time').alias('time')
     ).where('deviceSignal>30').groupBy('deviceType').agg(
         F.count('deviceID').alias('cnt_deviceID'),
         F.round(F.avg('deviceSignal'), 2).alias('avg_deviceSignal')
     )
     
     # 4- 数据输出
     # 5- 启动流式任务
     result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
 ​
 ​
 if __name__ == '__main__':
     # 1- 创建SparkSession对象
     spark = SparkSession.builder\
         .config("spark.sql.shuffle.partitions",2)\
         .appName('iot')\
         .master('local[*]')\
         .getOrCreate()
 ​
     # 2- 数据输入
     init_df = spark.readStream\
         .format("kafka") \
         .option("kafka.bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092") \
         .option("subscribe", "search-log-topic") \
         .load()
 ​
     # 3- 数据处理
     # 3.1- 数据ETL:进行数据类型转换,将value字段bytes->字符串
     etl_tmp_df = init_df.selectExpr("cast(value as string) as value")
     etl_tmp_df.createTempView('iot')
 ​
     # SQL
     # sql()
 ​
     # DSL
     dsl()

运行结果截图:

结构化流不支持的操作:

  • 多个流同时聚合

  • limit和take不能使用

  • 不能使用去重操作

  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1938641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浅谈芯片验证中的仿真运行之 timescale (五)提防陷阱

一 仿真单位 timeunit 我们知道,当我们的代码中写清楚延时语句时,若不指定时间单位,则使用此单位; 例如: `timescale 1ns/1ps 则 #15 语句表示delay15ns; 例:如下代码,module a 的timescale是1ns/1ps, module b 是1ps/1ps; module b中的clk,频率是由输入参…

【LeetCode】222. 完全二叉树的个数

什么是计算机基础?如果本题能够用二分二进制二叉树的方式解出本题,那么我可以认为你的计算机基础就很好了。很久以来,我一直认为自己的计算机基础好,但是自刷题以来,跟网上这么多优秀的同学相比,我发现我实…

vxe-grid 实现配置式form搜索条件 form搜索条件框可折叠 配置式table

文章目录 效果图代码 效果图 代码 <template><div class"app-container"><vxe-grid refxGrid v-bind"gridOptions" v-if"tableHeight" :height"tableHeight"><template #billDate"{ data }"><e…

FPGA实验5:4位加法计数器

实验目的及要求 掌握时钟信号、进程和BUFFER端口的运用&#xff1b;了解计数器的设计、仿真和硬件测试&#xff0c;进一步熟悉VHDL语句、语法及应用等。 实验原理 运用Quartus II 集成环境下的VHDL文本设计方法设计4位加法计数器&#xff0c;进行波形仿真和分析、引脚分配…

【Apache Doris】周FAQ集锦:第 15 期

【Apache Doris】周FAQ集锦&#xff1a;第 15 期 SQL问题数据操作问题运维常见问题其它问题关于社区 欢迎查阅本周的 Apache Doris 社区 FAQ 栏目&#xff01; 在这个栏目中&#xff0c;每周将筛选社区反馈的热门问题和话题&#xff0c;重点回答并进行深入探讨。旨在为广大用户…

从数据湖到湖仓一体:统一数据架构演进之路

文章目录 一、前言二、什么是湖仓一体&#xff1f;起源概述 三、为什么要构建湖仓一体&#xff1f;1. 成本角度2. 技术角度 四、湖仓一体实践过程阶段一&#xff1a;摸索阶段(仓、湖并行建设)阶段二&#xff1a;发展阶段方式一、湖上建仓(湖在下、仓在上)方式二&#xff1a;仓外…

一种优雅的方法获取PyInstaller打包Python程序的资源路径和目录路径

1、需求分析 运行Python程序的时候需要获取的资源有2种&#xff1a; 一种是固定的资源文件&#xff0c;你希望启动程序的时候可以调用的&#xff0c;比如数据库文件和图片资源文件&#xff1b;另一种是用于存储历史记录的文件&#xff0c;你希望每次打开都会改变其中的内容&am…

【机器学习】智能驱动未来:机器学习在能源效率提升与环境管理中的创新应用

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀目录 &#x1f50d;1. 引言&#x1f4d2;2. 机器学习能源环境领域的应用潜力&#x1f304;能源效率提升&#x1f3de;️环境管理⛰️具体案例…

动漫风格动漫404网站维护HTML源码

源码介绍 动漫风格动漫404网站维护HTML源码&#xff0c;源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面 效果预览 源码下载 动漫风格动漫404网站维护HTML源码

收银系统源码-千呼新零售收银视频介绍

千呼新零售2.0系统是零售行业连锁店一体化收银系统&#xff0c;包括线下收银线上商城连锁店管理ERP管理商品管理供应商管理会员营销等功能为一体&#xff0c;线上线下数据全部打通。 适用于商超、便利店、水果、生鲜、母婴、服装、零食、百货、宠物等连锁店使用。 详细介绍请…

XMl基本操作

引言 使⽤Mybatis的注解⽅式&#xff0c;主要是来完成⼀些简单的增删改查功能. 如果需要实现复杂的SQL功能&#xff0c;建议使⽤XML来配置映射语句&#xff0c;也就是将SQL语句写在XML配置⽂件中. 之前&#xff0c;我们学习了&#xff0c;用注解的方式来实现MyBatis 接下来我们…

传输层和网络层的关系,ip协议+ip地址+ip报头字段介绍(4位TOP字段,8位生存时间(ttl)),ip地址和端口号的作用

目录 传输层和网络层的关系 引入 介绍 ip协议 介绍 ip地址 引入 数据传递过程 举例(ip地址的作用) ip报头 格式 4位版本号 ip地址不足的问题 8位服务类型 4位TOP(type of service)字段 最小延时 最大吞吐量 4位首部长度 16位总长度 8位协议号 首部校验和…

科研绘图系列:R语言分割小提琴图(Split-violin)

介绍 分割小提琴图(Split-violin plot)是一种数据可视化工具,它结合了小提琴图(violin plot)和箱线图(box plot)的特点。小提琴图是一种展示数据分布的图形,它通过在箱线图的两侧添加曲线来表示数据的密度分布,曲线的宽度表示数据点的密度。而分割小提琴图则是将小提…

LeNet实验 四分类 与 四分类变为多个二分类

目录 1. 划分二分类 2. 训练独立的二分类模型 3. 二分类预测结果代码 4. 二分类预测结果 5 改进训练模型 6 优化后 预测结果代码 7 优化后预测结果 8 训练四分类模型 9 预测结果代码 10 四分类结果识别 1. 划分二分类 可以根据不同的类别进行多个划分&#xff0c;以…

【数据分享】2013-2022年我国省市县三级的逐月SO2数据(excel\shp格式\免费获取)

空气质量数据是在我们日常研究中经常使用的数据&#xff01;之前我们给大家分享了2000——2022年的省市县三级的逐月PM2.5数据和2013-2022年的省市县三级的逐月CO数据&#xff08;均可查看之前的文章获悉详情&#xff09;&#xff01; 本次我们分享的是我国2013——2022年的省…

Langchain-Chatchat-Ubuntu服务器本地安装部署笔记

Langchain-Chatchat&#xff08;原Langchain-ChatGLM&#xff09;基于 Langchain 与 ChatGLM 等语言模型的本地知识库问答 | Langchain-Chatchat (formerly langchain-ChatGLM), local knowledge based LLM (like ChatGLM) QA app with langchain。 开源网址&#xff1a;https:…

基于NeRF的路面重建算法——RoME / EMIE-MAP / RoGS

基于NeRF的路面重建算法——RoME / EMIE-MAP / RoGS 1. RoMe1.1 Mesh Initialization / Waypoint Sampling1.2 Optimization1.3 Experiments 2. EMIE-MAP2.1 Road Surface Representation based on Explicit mesh and Implicit Encoding2.2 Optimizing Strategies2.3 Experimen…

基于面向对象和递归的拦截器设计模式

1 定义 拦截器模式&#xff08;Interceptor Pattern&#xff09;&#xff0c;是指提供一种通用的扩展机制&#xff0c;可以在业务操作前后提供一些切面的&#xff08;Cross-Cutting&#xff09;的操作。这些切面操作通常是和业务无关的&#xff0c;比如日志记录、性能统计、安…

SciPy版本与Python和NumPy各个版本的兼容性

但是现在我用Scipy1.13.1&#xff0c;Python3.10&#xff0c;NumPy2.0.0&#xff0c;使用Scipy时会报错&#xff0c;将NumPy 版本降低为1.26.4以后&#xff0c;就没有报错了。

C++ | Leetcode C++题解之第268题丢失的数字

题目&#xff1a; 题解&#xff1a; class Solution { public:int missingNumber(vector<int>& nums) {int n nums.size();int total n * (n 1) / 2;int arrSum 0;for (int i 0; i < n; i) {arrSum nums[i];}return total - arrSum;} };