YMatrix + PLPython替代Spark实现车联网算法

news2024/11/19 3:39:05

PySpark算法开发实战

一、PySpark介绍

Spark是一种快速、通用、可扩展的大数据分析引擎,PySpark是Spark为Python开发者提供的API。在有非常多可视化和机器学习算法需求的应用场景,使用PySpark比Spark-Scala可以更好地和python中丰富的库配合使用。

使用Python开发Spark需要使用到pyspark,pyspark是Spark为Python开发者提供的API。pyspark使用Py4J库,使得Python可以使用JVM对象。
在这里插入图片描述

二、运行环境搭建

操作系统 CentOS Linux release 7.8.2003 (Core)
Java 1.8.0_151
Python 3.6.13
Spark 2.4.0
Miniconda 4.5.4
pyspark 3.2.1   
pyarrow 6.0.1

Miniconda

  1. 安装Miniconda
    conda和virtualenv是Python的包管理与环境管理工具。conda的安装程序中包含conda软件包管理器和Python,不需要再单独安装Python,使用起来较为方便。Miniconda为conda精简版,大小约为50M。
    由于我们使用的Spark版本与Python版本为历史版本,需要用4.5.4版本的Miniconda(对应Python 3.6)进行安装。当前官网下载页的miniconda支持到最低3.7版本Python,需要在https://repo.anaconda.com/miniconda/上下载。根据机型选择Miniconda2-4.5.4-Linux-x86_64.sh下载。
    下载完成之后运行脚本Miniconda2-4.5.4-Linux-x86_64.sh进行安装。完成之后可以使用conda -V检查安装结果。
    conda -V conda 4.5.4
  2. 设置用于Spark的虚拟环境
    首先建立一个pyspark_env的环境
    conda create --name pyspark_env python=3.6
    新建完成之后可以从过conda activate进入虚拟环境
    conda activate pyspark_env
    进入环境之后命令行会有环境名的标识用于区分

创建好并进入pyspark_env的虚拟环境之后,我们需要安装两个Spark相关的库,pyspark和pyarrow。可以使用conda install安装或者也可使用pip,这里以使用pip安装为例:
pip3 install pyarrow pyspark
安装完毕之后可以使用conda list查看安装好的库
在这里插入图片描述

此,环境搭建中的conda部分已经完成。详细的操作可以参考Spark的最新文档,pyspark conda部署的部分是多版本通用的:Installation - PySpark 3.2.1 documentation

Spark

  1. 下载Spark
    我们下载Spark已经编译好的压缩包,所有的历史版本可以在这个链接中找到:https://archive.apache.org/dist/spark/,
    本文下载spark-2.4.0-bin-hadoop2.7.tgz
    下载完成之后解压文件。完成之后可以进入目录运行bin/spark-shell进行测试
    在这里插入图片描述

  2. Standalone模式启动集群
    Spark的集群模式总共分为四种

  • Standalone
  • Apache Mesos
  • Hadoop YARN
  • Kubernetes
    2、3、4都比较好理解,Standalone模式是Spark自身实现的资源调度框架。
    复制spark根目录下的conf/spark-env.sh.template -> conf/spark-env.sh
    在其中添加
    SPARK_MASTER_HOST = [hostname] # master的主机名
    SPARK_MASTER_PORT= 7077
    在master节点上运行
    ./sbin/start-master.sh
    启动之后可以登录webui查看,地址为IP:8080
    在这里插入图片描述

同样,在slave节点设置好环境变量之后运行
./sbin/start-slave.sh

三、Spark分布式运行算法

下面的代码是Spark 运行Pandas UDF的例子。

def scalar_pandas_udf_example(spark):
    import pandas as pd
    from pyspark.sql.functions import col, pandas_udf
    from pyspark.sql.types import LongType
    def multiply_func(a, b):
        return a * b
    multiply = pandas_udf(multiply_func, returnType=LongType())
    x = pd.Series([1, 2, 3])
    print(multiply_func(x, x))
    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
    df.select(multiply(col("x"), col("x"))).show()
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master('local')\
        .appName("UDFTest") \
        .getOrCreate()
    print("Running pandas_udf scalar example")
    scalar_pandas_udf_example(spark)
    spark.stop()

首先生成一个SparkSession对象,参数master->'local’指的是local模式运行,如果是集群的话这里local换成spark:\masterip:7077,appName->'UDFTest’定义了任务名称

spark = SparkSession \
        .builder \
        .master('local')\
        .appName("UDFTest") \
        .getOrCreate()

定义一个简单的函数

def multiply_func(a, b):
        return a * b

生成UDF对象

multiply = pandas_udf(multiply_func, returnType=LongType())

生成一个pandas的数据

x = pd.Series([1, 2, 3])

创建一个Spark dataframe对象并让spark执行UDF

df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
df.select(multiply(col("x"), col("x"))).show()

该代码运行的结果为:
在这里插入图片描述

四、Spark数据处理的缺点

  1. 一般生产环境下的数据想要使用Spark做计算,数据需要从存储的数据库->落盘文件/消息队列->Spark集群数据链路需要建设和维护
  2. PySpark + Pandas UDF处理数据,尽管利用了Apache Arrow,核心还是需要JVM与Python之间数据传输,开销大,不适用于性能敏感的场景。
    YMatrix+ PLPython处理方式
    上文描述了Spark在车联网信号分析的实际使用案例。Spark的优点很明显,作为分布式的内存计算引擎,社区活跃、支持多语言开发、易于融合其他如Hadoop等框架。但使用上的缺点在上文中也有描述:需要搭建并维护一整条新的数据链路;并且除去Scala,使用Python等其他语言研发不适合在性能要求高的场景下使用。
    那么回到我们实现车联网信号分析这个案例上,实际上最终的需求是从一部分数据中取出想要的数据、并且经过一定的要求与计算,筛选出最终的数据结果——实际上就是简单的一条SQL+代码实现算法处理数据。上述例子中,“一条SQL”变成了整条数据链路,从数据库取出数据处理完丢进消息队列,“代码实现算法”的代码加载到了Spark分布式的消费数据和运行。如果能够把“代码实现算法”这部分与下沉到数据库这层,那么我们不仅减少了维护一整套数据链路的开销,还能利用数据库的算力提供性能,加速数据流转。想要利用数据库帮我们进行数据分析与计算,我们需要MPP架构的数据库。

一、YMatrix与PLPython介绍

YMatrix
YMatrix是超融合数据库,将交易型数据库(OLTP)、分析型数据库(OLAP)和时序数据库能力融为一体的超融合型分布式数据库产品,具备严格分布式事务一致性、水平在线扩容、安全可靠、成熟稳定、兼容PostgreSQL/Greenplum协议和生态等重要特性。为万物互联的智能时代提供坚实、简洁的智能数据核心基础设施,为物联网应用、工业互联网、智能运维、智慧城市、实时数仓、智能家居、车联网等场景提供一站式高效解决方案,YMatrix为公司自主研发的国产数据库,公司拥有该产品全部知识产权。产品的架构如下。
在这里插入图片描述

YMatrix不但对经典的Greenplum数据仓库场景进行了大幅增强,而且可以极佳的支持大规模时序数据处理、支持时空数据、结构化数据和半结构化数据,一套数据库解决各种数据类型,避免为了处理不同类型数据引入不同类型的产品。实现提高开发运维效率、提升系统性能、降低整体成本的目标。
在这里插入图片描述

PLPython
PL/Python过程语言允许用Python编写 PostgreSQL函数。Python有非常多成熟的库能够提供给我们做数据分析,如numpy、pandas等。
使用PLPython方便数据分析的算法实现,可以充分利用YMatrix分布式储存和算力。
在这里插入图片描述

二、PLPython调用外部Python代码

上文中描述了如何使用Python开发Spark应用,让我们的算法能够使用Spark的分布式计算能力,整体的数据流程是从csv的数据文件中读取数据->Spark Arrow内存的数据类型中->Spark分布式计算输出结果。
相同的算法也可以通过PLPython,将数据转存到YMatrix查询计算实现
将数据导入YMatrix
使用Mxgate将csv中的数据导入数据库,在此之前需要新建表

        vin text,
        daq_time DATE,
        status INT,
        c_stat INT,
        mode INT,
        speed INT,
        mileage INT,
        t_volt INT,
        t_current INT,
        soc INT,
        dcdc_stat INT,
        isulate_r INT,
        lng BIGINT,
        lat BIGINT,
        max_volt_bat_id INT,
        max_volt_cell_id INT,
        max_cell_volt INT,
        min_volt_bat_id INT,
        min_cell_volt_id INT,
        min_cell_volt INT,
        max_temp_sys_id INT,
        max_temp_probe_id INT,
        max_temp INT,
        min_temp_sys_id INT,
        min_temp_probe_id INT,
        min_temp INT,
        max_alarm_lvl INT,
        genral_alarm INT,
        cell_volt_list text,
        cell_temp_list TEXT,
        pdate date) 
distributed BY (vin)

表格建好了之后导入数据

tail -n +1 data.csv | mxgate --source stdin --db-database test --db-master-host localhost.localdomain --db-master-port 5432 --db-user mxadmin --time-format raw --target suanfa_data  --delimiter ','

编写PLPython函数调用算法
首先我们需要把算法代码上传到服务器上,在本例中路径为/home/mxadmin/plpython/
我们需要查询suanfa_data表中的所有数据,并将结果转化成pandas的Dataframe格式,传递给算法函数去做处理

sql = "SELECT * FROM suanfa_data;"
df = psql.frame_query(sql, cnxn)


create function suanfa_detector() returns void as $$

import sys
sys.path.append('/home/mxadmin/plpython/')
from src.analyzer import analyzer

import pyodbc
import pandas.io.sql as psql
sql_result = plpy.execute("SELECT * FROM suanfa_data;")
df = pd.DataFrame.from_records(sql_result)

result = analyzer(df)
plpy.notice(result)

$$ language plpython3u;

下面是算法函数,输入是我们suanfa_detector()中sql的查询结果转换成的dataframe对象,经过数据处理,最后输出结果的dataframe

Def  analyzer(data: pandas.Dataframe)-> pandas.Dataframe:
    data_wash(data)
    sign_data_veh_state(data)
    detect_two_alarm_tuples = detect_two_analyze(data)
    return detect_two_alarm_tuples

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/374725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

监听页面滚动,给页面中的节点添加动态过渡效果

效果示例图 示例代码 <template><div class"animation-wrap"><!-- header-start --><div class"animation-header">头部</div><!-- header-end --><div class"animation-subtitle animation-show">标…

工人搬砖-课后程序(JAVA基础案例教程-黑马程序员编著-第八章-课后作业)

【案例8-4】 工人搬砖 【案例介绍】 1.任务描述 在某个工地&#xff0c;需要把100块砖搬运到二楼&#xff0c;现在有工人张三和李四&#xff0c;张三每次搬运3块砖&#xff0c;每趟需要10分钟&#xff0c;李四每次搬运5块砖&#xff0c;每趟需要12分钟。本案例要求编写程序分…

收集分享一些AI工具第三期(网站篇)

感谢大家对于内容的喜欢&#xff0c;目前已经来到了AI工具分享的最后一期了&#xff0c;目前为止大部分好用的AI工具都已经介绍给大家了&#xff0c;希望大家可以喜欢。 image-to-sound-fx (https://huggingface.co/spaces/fffiloni/image-to-sound-fx) 图片转换为相对应的声音…

【unity3d】unity即时战略游戏开发2 rts engine

A 背景 经过寻找发现有unity3d的[rts engine]&#xff0c;ue4的[template 4]等rts引擎/模板。 没有搜到相关教程&#xff0c;倒是有几个老外的ue从零开发长篇教程。 rts engine有几个试玩视频&#xff0c;尝试找了一下。那就不用虚幻了。 距离[原坤争霸 genshin craft]近了…

【ChatGPT整活大赏】写论文后自动生成视频

ChatGPT国内又火了一把&#xff0c;功能很强大&#xff0c;接下来就带大家感受一下它的强大之处&#xff0c;通过ChatGPT写一篇论文并自动生成视频&#xff0c;增加内容的可读性。 话不多说&#xff0c;先上成果&#xff1a; …

MySQL管理表

在创建表时需要提前了解mysql里面的数据类型 常见的数据类型 创建表 创建表方式1&#xff1a; 格式&#xff1a; CREATE TABLE [IF NOT EXISTS] 表名( 字段1, 数据类型 [约束条件] [默认值], 字段2, 数据类型 [约束条件] [默认值], 字段3, 数据类型 [约束条件] [默认值], ………

以FGSM算法为例的对抗训练的实现(基于Pytorch)

1. 前言 深度学习虽然发展迅速,但是由于其线性的特性,受到了对抗样本的影响,很容易造成系统功能的失效。 以图像分类为例子&#xff0c;对抗样本很容易使得在测试集上精度很高的模型在对抗样本上的识别精度很低。 对抗样本指的是在合法数据上添加了特定的小的扰动&#xff0c;…

聚类算法(下):10个聚类算法的评价指标

上篇文章我们已经介绍了一些常见的聚类算法&#xff0c;下面我们将要介绍评估聚类算法的指标 1、Rand Index Rand Index&#xff08;兰德指数&#xff09;是一种衡量聚类算法性能的指标。它衡量的是聚类算法将数据点分配到聚类中的准确程度。兰德指数的范围从0到1,1的值表示两…

Python-GEE遥感云大数据分析、管理与可视化技术及多领域案例实践应用

随着航空、航天、近地空间等多个遥感平台的不断发展&#xff0c;近年来遥感技术突飞猛进。由此&#xff0c;遥感数据的空间、时间、光谱分辨率不断提高&#xff0c;数据量也大幅增长&#xff0c;使其越来越具有大数据特征。对于相关研究而言&#xff0c;遥感大数据的出现为其提…

【阿旭机器学习实战】【37】电影推荐系统---基于矩阵分解

【阿旭机器学习实战】系列文章主要介绍机器学习的各种算法模型及其实战案例&#xff0c;欢迎点赞&#xff0c;关注共同学习交流。 电影推荐系统 目录电影推荐系统1. 问题介绍1.1推荐系统矩阵分解方法介绍1.2 数据集&#xff1a;ml-100k2. 推荐系统实现2.1 定义矩阵分解函数2.2 …

什么牌子的蓝牙耳机便宜好用?四款高品质蓝牙耳机推荐

随着时代的发展&#xff0c;蓝牙耳机的使用频率越来越高&#xff0c;不少人外出时除了带手机外&#xff0c;蓝牙耳机也成为了外出必备的数码产品之一。现在的蓝牙耳机品牌众多&#xff0c;什么牌子的蓝牙耳机便宜好用&#xff1f;下面&#xff0c;我来给大家推荐四款高品质的蓝…

ZigBee组网原理详解

关键词&#xff1a;RFD FFD ZigBee 1. 组网概述 组建一个完整的zigbee网状网络包括两个步骤&#xff1a;网络初始化、节点加入网络。其中节点加入网络又包括两个步骤&#xff1a;通过与协调器连接入网和通过已有父节点入网。 ZigBee网络中的节点主要包含三个&#xff1a;终端…

一文3000字从0到1实现基于Selenium+Python的web自动化测试框架 (建议收藏)

一、什么是Selenium&#xff1f; Selenium是一个基于浏览器的自动化测试工具&#xff0c;它提供了一种跨平台、跨浏览器的端到端的web自动化解决方案。Selenium主要包括三部分&#xff1a;Selenium IDE、Selenium WebDriver 和Selenium Grid。 Selenium IDE&#xff1a;Firefo…

阿里云服务器宝塔phpstudyIIS建站

P1 建站准备工作 1.购买云服务器 &#xff08;新用户登录阿里云有阿里云服务器一个月的试用权限&#xff0c;但是试用期的云服务器有地区限制&#xff08;不可自己选择地区&#xff09;&#xff0c;我的显示的是杭州&#xff0c;内地的服务器进行域名绑定的话&#xff0c;需要…

香港新世代加密资产网红正在崛起

2023年&#xff0c;历经兴衰的加密资产&#xff0c;在元宇宙和NFT的影响下&#xff0c;越来越多人开始关注这个领域。而在香港&#xff0c;不同的人更是成为了加密资产网红&#xff0c;引起加密资产热度的提升。香港加密资产政策促进网红崛起随着加密资产在全球的兴起&#xff…

OPPO手机删除文件数据恢复技巧篇

由于各种原因&#xff0c;所有 Android 手机上的数据都可能丢失。Oppo也是一个专注于Android操作系统的智能手机品牌。因此&#xff0c;您的 Oppo 设备上的数据也容易被删除和损坏。在本文中&#xff0c;我们将讨论 Oppo 用户恢复丢失或删除数据的不同方式。我们将详细讲解OPPO…

原始GAN-pytorch-生成MNIST数据集(原理)

文章目录1. GAN 《Generative Adversarial Nets》1.1 相关概念1.2 公式理解1.3 图片理解1.4 熵、交叉熵、KL散度、JS散度1.5 其他相关&#xff08;正在补充&#xff01;&#xff09;1. GAN 《Generative Adversarial Nets》 Ian J. Goodfellow, Jean Pouget-Abadie, Yoshua Be…

string类的理解以及模拟实现

string类的理解为什么需要学习string类标准库中的string类string类简单了解string类常见接口string模拟实现深浅拷贝问题标准库下的stringVS环境下g环境下为什么需要学习string类 在C语言中&#xff0c;字符串和字符串相关的函数是分开的&#xff0c;不太符合面向对象的思想&a…

在线视频加密播放与防下载该如何考虑?

在线视频加密播放与防下载该如何考虑&#xff1f; ▲ 图 / 防录屏随机水印 1. 视频加密&#xff08;分片加密&#xff09;VRM加密&#xff1a; 将视频进行切片、对碎片逐一进行混淆式加密&#xff0c;包括AES128加密、XOR加密、关键帧错序等。 2. 防录屏&#xff08;用名信息I…

IM即时通讯开发如何解决大量离线消息导致客户端卡顿的

大部分做后端开发的朋友&#xff0c;都在开发接口。客户端或浏览器h5通过HTTP请求到我们后端的Controller接口&#xff0c;后端查数据库等返回JSON给客户端。大家都知道&#xff0c;HTTP协议有短连接、无状态、三次握手四次挥手等特点。而像游戏、实时通信等业务反而很不适合用…