spark graph基础(一)

news2025/1/11 23:56:34

1 overView

1.1 图的构成

图由节点和边组成,其中VertexRDD[VD] 和EdgeRDD[ED] 继承和优化了 RDD[(VertexId, VD)] 和RDD[Edge[ED]] 。

class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

1.2 图使用示例

如下图所示,使用spark Graph表示为图 ,其中用户节点有两个属性。每条边有一个属性。
① 构建图

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

② 获取节点和边
graph.vertices 返回 VertexRDD[(String, String)] 其中 VertexRDD[(String, String)]继承于 RDD[(VertexId, (String, String))] 。

graph.edges 返回 EdgeRDD ,EdgeRDD 包含 Edge[String] 对象。

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

③ EdgeTriplet
EdgeTriplet 包含 srcAttr 和 dstAttr属性 ,继承于Edge,

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

Graph

2 图的基本操作

2.1 常用图操作算子

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

2.2 属性操作

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

2.3 图结构操作

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

子图操作

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

2.4 join 操作

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

2.5 Neighborhood Aggregation

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

the aggregateMessages operator to compute the average age of the more senior followers of each user.

import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators

// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr)
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) =>
    value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

2.6Map Reduce Triplets Transition Guide

class Graph[VD, ED] {
  def mapReduceTriplets[Msg](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
      reduce: (Msg, Msg) => Msg)
    : VertexRDD[Msg]
}

The following code block using mapReduceTriplets:

val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

can be rewritten using aggregateMessages as:

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
  triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

2.7 Computing Degree Information

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

2.8 Collecting Neighbors

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}

These operators can be quite costly as they duplicate information and require substantial communication. If possible try expressing the same computation using the aggregateMessages operator directly.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/383316.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Typro使用以及安装教程来啦

Typora是一款轻便简洁的Markdown编辑器,支持即时渲染技术,这也是与其他Markdown编辑器最显著的区别。即时渲染使得你写Markdown就想是写Word文档一样流畅自如,不像其他编辑器的有编辑栏和显示栏。今天为大家分享下有关Typroa的安装以及使用&a…

TryHackMe-黑我杯

黑我杯 相信我们大家在TryHackMe的日积月累都学到了不少东西,从纯萌新到oscp再到更高 我很高兴能将国内各thm玩家聚集到一起,构建一个更好的学习环境和氛围 本次娱乐分两场: Offensive Pentesting — 中等难度Junior Penetration — 容易难…

@Autowired和@Resource到底有什么区别

Autowired 和 Resource 都是 Spring/Spring Boot 项目中,用来进行依赖注入的注解。它们都提供了将依赖对象注入到当前对象的功能,但二者却有众多不同,并且这也是常见的面试题之一,所以我们今天就来盘它。 Autowired 和 Resource 的…

Linux 确认 NTP 是否同步成功

NTP 即Network Time Protocol,它通过网络同步计算机系统之间的时钟。NTP 客户端会将其时钟与 NTP 服务器同步。NTP同步状态可以通过以下三个命令查询:ntpq:ntpq 是标准的 NTP 查询程序。ntpstat:显示网络时间同步的状态。timedate…

【房间墙上凿个洞,看你在干嘛~】安全攻防内网渗透-绕过防火墙和安全检测,搭建DNS隐蔽隧道

作者:Eason_LYC 悲观者预言失败,十言九中。 乐观者创造奇迹,一次即可。 一个人的价值,在于他所拥有的。所以可以不学无术,但不能一无所有! 技术领域:WEB安全、网络攻防 关注WEB安全、网络攻防。…

Spark 广播/累加

Spark 广播/累加广播变量普通变量广播分布式数据集广播克制 Shuffle强制广播配置项Join Hintsbroadcast累加器Spark 提供了两类共享变量:广播变量(Broadcast variables)/累加器(Accumulators) 广播变量 创建广播变量…

快速上手配置firewalld

firewalld使用firewall-cmd命令配置策略。 查看当前firewalld当前服务运行状态 firewall-cmd --state firewalld防火墙状态还用使用如下命令查看状态 systemctl status firewalld 查看所有打开运行的端口 firewall-cmd --zonepublic --list-ports 查看区域信息情况 firewall…

qml学习之qwidget与qml结合使用并调用信号槽交互

学习qml系列之一说明: 学习qml系列之qwiget和qml信号槽的交互使用,并在qwidget中显示qml界面 在qml中发送信号到qwidget里 在qwidget里发送信号给qml 在qwidget里面调用qml界面方式 方式一:使用QQuickView 这个是Qt5.0中提供的一个类&…

小白量化《穿云箭集群量化》(5)抄底雷达策略

小白量化《穿云箭集群量化》(5)抄底雷达策略 雷达能够提前发现远处敌我动向。雷达是现代战争不可或缺的装备。 证券市场中分三类人,先知先觉者,后知后觉者,不知不觉者。先知先觉者往往是市场主力,他们拥有信…

Feign踩坑源码分析 -- 请求参数分号变逗号

一.案例 1.1.Post请求: http://localhost:8250/xx/task/test json格式参数: {"string": "a;b;c;d" } 1.2.controller代码: AutowiredDataSourceClientService dataSourceClientService;RequestMapping("/test"…

《计算机原理》——HelloWorld.cpp如何运行的

学校《计算机原理》开课啦!特此开辟专栏,将一些知识作为笔记,记录下来。 前言 本篇博客知识点来源于educoder的相关题目 1. 相关知识 1.1 计算机语言 计算机语言是人与计算机之间通讯的语言,计算机语言包括编写计算机程序的字符…

[MatLab]图像绘制

一、绘制二维图像 1.一张图上绘制一条线 绘制代码如下面所示: x 0:0.01:2*pi; y sin(x); figure %建立幕布 plot(x,y) %绘制图像 %设置图像属性 title(ysin(x)) xlabel(x) ylabel(y)xlim([0 2*pi]) %限制x轴的值域 自定义图线的颜色…

GB28181协议--SIP协议介绍

1、SIP协议简介 SIP(Session Initiation Protocol,会话初始协议)是一个用于建立、更改和终止多媒体会话的应用层控制协议,其中的会话可以是IP电话、多媒体会话或多媒体会议(GB28181安防使用的是SIP协议)。S…

lab备考第二步:HCIE-Cloud-Compute-第一题:FusionCompute

第一题 FusionCompute 一、题目介绍 1.1. 扩容CAN节点与对接共享存储(必选) 题目及【考生提醒关键点】 扩容一台CNA节点,配置管理地址设置为:192.168.100.212。密码设置为:Cloud12#$。【输入之前确认自己的大小写是否…

任务类风险漏洞挖掘思路

任务类风险定义: 大部分游戏都离不开任务,游戏往往也会借助任务,来引导玩家上手,了解游戏背景,增加游戏玩法,提升游戏趣味性。任务就像线索,将游戏的各个章节,各种玩法,…

docker上安装nacos

文章目录一、docker安装nacos简单版1.拉取镜像2、挂载目录,用于映射到容器,目录按自己的情况创建3、mysql新建nacos-config的数据库,并执行脚本 sql脚本地址如下:4、修改配置文件custom.properties5、启动容器6、访问二、docker安…

错误:PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。“+文件路径“的解决方案

最近在使用python进行筛选图片的时候,想到用python里面的os库进行图片的删除。 具体筛选方法就是,删除掉图片长度或宽度小于100像素的图片,示例代码如下所示: for file in os.listdir(img_path):if file .split( . )[ - 1 ] j…

深度强化学习DLR

1 强化学习基础知识 强化学习过程:⾸先环境(Env)会给智能体(Agent)⼀个状态(State),智能体接收到环境给的观测值之后会做出⼀个动作(Action),环境接收到智能体给的动作之后会做出⼀系列的反应,例如对这个动作给予⼀个奖励(Reward…

射频功率放大器基于纵向导波的杆状构件腐蚀诊断方法的研究

实验名称:基于纵向导波的杆状构件腐蚀诊断方法研究方向:无损探伤测试设备:信号号发生器、安泰ATA-8202功率放大器、数据采集卡、直流电源、超声探头、钢杆、前置放大器。实验过程:图:试验装置试验装置如图3.2所示。监测…

Android Handler机制(四) Message源码分析

一. 简介 接上一篇文章:Android Handler机制(三) Looper源码分析 ,我们来继续分析一下Message源码 这一系列文章都是为了深入理解Handler机制. Message 作为消息传递的载体,源码主要分为以下 几个部分: 1. 操作数据相关,类似 getter()和 setter()这种…