文章目录
- 13. 图计算
- 13.1 图计算简介
- 13.2 Pregel简介
- 13.3 Pregel图计算模型
- 13.3.1 有向图和顶点
- 13.3.2 Pregel的计算过程
- 13.3.2 Pregel实例
- 13.4 Pregel的C++ API
- 13.4.1 定义Vertex基类
- 13.4.2 消息传递机制和Combiner
- 13.4.3 Aggregator、拓扑改变和输入输出
- 13.5 Pregel的体系结构
- 13.5.1 Pregel的执行过程和容错性
- 13.5.2 Worker、Master和Aggregator
- 13.6 Pregel的应用实例:单源最短路径
- 13.7 Hama的安装和使用
13. 图计算
13.1 图计算简介
-
图计算是专门针对图数据结构的处理
- 许多大数据都是以大规模图或者网络的形式出现
- 许多非图结构的大数据,也常常会被转换为图模型后进行分析
- 图数据结构很好地表达了数据之间的关联性
- 关联性计算是大数据计算的核心—通过获得数据的关联性,可以从噪音很多的海量数据中抽取有用的信息
-
图的应用实例
- 购物者之间进行建模,可以得到兴趣比较相似的用户,为用户实时推荐商品
- 图结构计算可以发现传播关系中的意见领袖,如热门话题讨论
-
传统的图计算算法存在的典型问题
-
常常表现出比较差的内存访问局限性
-
针对单个顶点的处理工作过少
-
计算过程中伴随着并行读的改变
解决这些典型图计算的解决方案:
-
为特定的图应用定制相应的分布式实现
-
基于现有的分布式计算平台进行图计算:如MapReduce,图计算是细粒度的多次迭代工作, 因此在MapReduce上性能较差
-
使用单机的图算法库:BGL、LEAD、NetworkX、JDSL、Stanford、GraphBase和FGL等,但对于大规模的图计算任务有很大局限性
-
使用已有的并行图计算系统,比如Paeallel BGL和CGM Graph,实现了很多并行图算法
-
-
-
通用的图计算软件:解决传统图计算存在的典型问题
-
是基于遍历算法的、实时的图数据库,如Neo4j、OrientDB、DEX、Infinite Graph等
-
以图顶点为中心的、基于消息传递批处理的并行引擎,如GoldenOrb、Giraph、Pregel、Hama
-
这些软件都是基于通用模型:BSP(Bulk Synchronous Parallel Computing Model)模型,也叫整体同步并行计算模型或者大同步模型
-
其通过网络连接起来的处理器,去完成图计算的并行任务,其包括了一系列的全局超步
-
一个超步的垂直结构图
- 局部计算:每个参与的处理器都有自身的计算任务,只读取存储在本地内存中的值,而不会读取其他存储器上的值,各处理器之间都是异步独立执行的
- 通讯:不同处理器处理完数据之后,要交换彼此数据,完成下一轮的迭代计算,迭代过程中通过消息方式将计算传递给其他处理器
- 栅栏同步:一个处理器执行完毕后,遇到栅栏会停止等待,等待所有处理器都执行完毕后,再进入下一轮迭代
-
-
13.2 Pregel简介
-
Pregel是谷歌公司发布的一款商业图计算产品
-
谷歌在后Hadoop时代的新“三架马车”
- Caffeine:帮助谷歌快速实现大规模网页索引构建
- Dremel:实时交互式分析产品,支持分析PB级别的数据
- Pregel:基于BSP模型实现的并行图处理系统
13.3 Pregel图计算模型
13.3.1 有向图和顶点
-
有向图顶点
-
Pregel计算模型以有向图作为输入
-
有向图的每个顶点都有一个String类型的顶点ID
-
每个顶点都有一个可修改的用户自定义值与之关联
-
每条有向边都和其源顶点关联,并记录了其目标顶点ID
-
边上有一个可修改的用户自定义值与之关联
-
-
计算模型:BSP模型
-
顶点间消息传递
- 基本方法:基于远程读取和基于共享内存
- 远程读取:将数据存储在本地磁盘,然后通知其来磁盘中读取
- 基于共享内存:基于共享内存,相关顶点都将数据写入共享内存,顶点之间都访问共享内存读取数据
-
注意:Pregel没有采取以上两种方式,而主要采取消息传递模型
远程读取会带来高延迟,Pregel不需要远程读取就可以避免远程读取带来的高延迟
基于内存共享的模型的可扩展性不高,Pregel没有采用共享内存的方式,扩展性较好
- 基本方法:基于远程读取和基于共享内存
-
13.3.2 Pregel的计算过程
-
Pregel计算过程
-
Pregel的计算过程是由被称为“超步”的迭代组成的
-
在每个超步中,每个顶点上面会并行执行用户自定义的函数
-
-
Pregel的迭代何时结束?
-
当所有顶点都处于非活跃状态,并且所有顶点都不在有消息传递时,结束算法
-
13.3.2 Pregel实例
-
实例:求最大值的Pregel计算过程图
- 超步0:不进行最大值计算,将自身的值通过出射边传递给目标顶点
- 超步1:若B点,它接受超步1中A和C顶点的值,发现都比自身的值小,即自身的值没发生变化,就会让自己变成非活跃状态,顶点C同理
- 超步2:由于B处于不活跃状态,此时没有其他顶点给A发送信息,所以A变为不活跃状态;由于A是活跃的,因此A在超步2中会给B发送一个6,但是不改变B本身的值,因此B仍为非活跃状态;在超步1中,由于D是活跃的,向C发送一个6,使得C的值改变,即从不活跃变为活跃状态
- 超步3:结束后所有节点都是非活跃状态
13.4 Pregel的C++ API
13.4.1 定义Vertex基类
-
Pregel已经预先定义好一个基类:Vertex
- Compute:虚函数,用户定义的每个顶点上的实现函数
- vertex_id:每个顶点都有一个string类型的id
- superstep:表示当前执行到了哪个超步
- GetValue:获得顶点当前关联的值
- MutableValue:用户可以修改当前顶点关联的值
- GetOutEdgeIterator:活得当前顶点的所有出射边
- SendMessageTo:沿着当前顶点的出射边,向外发送其他消息
- VoteToHalt:将当前顶点状态由活跃状态转为非活跃状态
- 在Vertex类中,定义了三个值类型的参数,分别表示顶点,边喝消息。每个顶点都有一个给定类型的值与之对应
- Pregel程序:编写Pregel程序时,需要继承Vertex类,并且覆写Vertex类的虚函数Compute()
- 每个顶点都运行相同的Pregel程序
- 每个顶点的状态都可修改,在同一超步中,状态修改后对该顶点立即可见,但对于其他顶点不可见
- 对整个过程中,唯一需要持久化的状态是顶点和边的对应的关联值
13.4.2 消息传递机制和Combiner
-
Pregel的消息传递机制
-
顶点之间的通讯是借助于消息传递机制来实现的,每条消息都包含了消息值和需要到达的目标顶点ID
-
在一个超步S中,一个顶点可以发送任意数量的消息,这些消息将在下一个超步(S+1)中被其他顶点接收
- Compute从迭代器取得上个超步的数据结果时,不能保证消息到达的顺序,只能保证消息一定会被传送,且不会被重复的传送
-
-
Combiner
-
Pregel计算框架在消息发出去之前,Combiner可以将发送往同一个顶点的多个整型值进行求和得到一个值,只需要向外发送这个“求和结果”,从而实现了由多个消息合并成一个消息,大大减少了传输和缓存的开销
-
实例
- Max Combiner 可以将消息进行合并,只将可能的Max Combiner发送给B
默认情况下,Prege计算框架并不会开启Combiner功能
当用户打算开启Combiner功能时,可以继承Combiner类并覆写虚函数Combine()
此外,通常只对那些满足交换律和结合律的操作才可以去开启Combiner功能
-
13.4.3 Aggregator、拓扑改变和输入输出
-
Aggregator:提供了一种全局通信监控和数据查看的机制
- 在超步S中,每个顶点都可以向Aggregator提供一个数据,Pregel计算框架会对这些值进行聚合操作产生一个值,在下一个超步(S+1)中,图中的所有顶点都可以看见这个值
- 实例
- Aggregator:定义了一个“Sum”,Aggregator来统计每个顶点的出射边数量
- AggregatorAggregator实现全局协调功能
- 设计“and” Aggregator来决定在某个超步中Compute()函数是否执行某些逻辑分支
- 只有当“and” Aggregator显示所有顶点都满足了某条件时,才去执行这些逻辑分支
-
拓扑改变
-
如求最小生成树,一定会删除一些冗余的边,发生拓扑的改变
-
Pregel计算框架允许在Compute()函数中修改图的拓扑结构
-
对于全局拓扑改变,Pregel采用了惰性协调机制
-
发出改变请求,必须将某个请求发给顶点v,告诉其要删除这条边;可能有好几个顶点发出请求要求顶点v删除边
-
顶点v会收到多个这个请求,刚开始收到请求,顶点v不会对其进行协调;当改变请求到达目标顶点,并且目标顶点需要执行的时候,
才会对请求进行协调
-
-
对于本地的拓扑改变,是不会引发冲突的,顶点或边的本地增减能够立即生效,很大程度上简化了分布式编程
-
-
输入输出
- 数据最开始可能不是图结构,会将其保存至文本文件、关系数据库、或者键值数据库中
- Pregel中“从输入文件生成得到图结构”和“执行图计算”这两个过程是分离的,从而不回限制输入文件的格式
- 对于输出,Pregel采用灵活的方式,可以以多种方式进行输出
13.5 Pregel的体系结构
13.5.1 Pregel的执行过程和容错性
-
Pregel的执行过程
-
在Pregel计算框架中,一个大型图会被划分成许多个分区,每个分区都包含了一部分顶点以及以其为起点的边
-
一个顶点应该被分到哪个分区上
-
是由一个函数决定的,系统默认函数为hash(ID) mod N,其中,N为所有分区总数,ID是这个顶点的标识符
-
-
Pregel用户程序的执行过程
-
1.选择Master、Worker
- Worker通过名称服务系统找到Master的位置,向Master发送注册信息,这样Master可以掌握各个Worker之间的位置信息,这样Master可以给各个Worker发送命令,协调各个Worker之间的工作
-
2.Master把图分为多个分区
- 每个Worker向外发送消息、同时也能接受自己的消息
-
3.Master把用户输入划分成多个部分
- 如果Worker从输入内容加载的数据
-
4.Master向Worker发送指令
-
5.所有Worker的节点都处于停机状态后,结束计算
-
-
-
容错性
- Pregel采用检查点机制来实现容错。在每个超步的开始,Master会通知所有的Worker把自己管辖的分区的状态写入到持久化存储设备,
- 状态如顶点值、边的值、接受到的消息等
- Master会周期性地向每个Worker发送ping消息,Worker收到ping消息后会给Master发送反馈消息,每个Worker上都保存一个或者多个分区的状态
- Master如果有很长一段时间没有收到来自Worker的消息,其就将Worker标记为失效;同样Worker在指定时间没收到Worker的ping消息,还会自动停止工作
- 当一个Worker发生故障时,它所负责维护的分区的当前状态信息就会丢失
- Master监测到一个Worker发生故障“失效”后,会把失效Worker所分配到的分区,重新分配到其他处于正常工作状态的Worker集合上
- Pregel采用检查点机制来实现容错。在每个超步的开始,Master会通知所有的Worker把自己管辖的分区的状态写入到持久化存储设备,
13.5.2 Worker、Master和Aggregator
-
Worker:一般在执行过程中,它的信息是保存在内存当中,包括
- 顶点当前值
- 出射边列表:每条出射表包括(目标顶点id,和边值)
- 消息队列:包括接收到的发送个这个顶点的消息
- 标志位:标志顶点是否是活跃状态
-
Worker会对自己所管辖的分区中的每个顶点进行遍历,并调节顶点上的Compute()函数,Compute参数包括
- 顶点当前值
- 消息迭代器
- 出射边迭代器
-
Worker:在Pretzel中,为了获得更好的性能,“标志位”和输入消息队列是分开保存的
-
为什么需要保存两份标志位和消息队列?
- 其中一份用于当前超步,另一份用于下一个超步
- 当前超步对列保存的是上一个顶点给其发送的消息,在执行过程中其他顶点给其他发送的消息放在下一个超步的队列中,在下一个超步进行处理
-
如果一个订单在超步S接收到消息,表示V将会在下一个超步S+1中(而不是当前超步S中)处于“活跃状态”
-
顶点v如何向顶点u发送消息?
- 如果顶点v和目标顶点u在同一个机器,则直接将消息放入到与目标顶点U对应的消息队列中
- 如果目标顶点u在远程机器,则会将消息先暂存到本地,当缓存中的消息数目达到一个事先设定的阈值时,这些缓存消息会被批量异步发送出去,传输到目标点所在的Worker上
-
-
Master
- Master扮分演管家的角色,主要协调Worker执行各个任务
-
Master维护着关于当前处于 “有效”状态的所有worker的各种信息,包恬每个Worker的ID和地址信息,以及每个Worker被分配到的分区信息
- Master中保存这些信息的数据结构的大小,只与分区的数量有关,而与顶点和边的数量无关
-
Master指令
Master向所有处于有效状态的Worker发送相同的指令,然后等待Worker回应,在指定时间内某—个Worker没有回应,说明这个Worker已经失效了,Master会进入恢复的模式
在每个超步中,图计算的种工作,会在“路障(barrier)"之前结束
-
Master在内部运行了一个HTTP服务器来显示图计算过程中的各种信息
-
Aggregator
- 在执行图计算过程的某个超步S中,每个Worker会利用一个Aggregator対当前本地分区中包含的所有顶点的值进行归约 ,得到一个本地的局部归约值
- 在超步S结束时,所有Worker会将所有包含局部归约值的Aggregator的值进行最后的汇总,得到全局值,然后提交给Master
- 在下一个超步S+1开始时,Master会格Aggregator的全局值发送给每个Worker
13.6 Pregel的应用实例:单源最短路径
-
Pregel非常适合用来解决单源最短路径问题,实现代码如下
-
单源最短路径举例
-
超步0:顶点0开始向其他节点发送消息
-
超步1
-
13.7 Hama的安装和使用
-
Hama简介
- Hamma是Google Pregel的开源实现
- 与Hadoop适合分布式大数据处理不同,Hamma主要用于分布式的矩阵、graph、网络算法的计算
- Hamma是在HDFS上实现的BSP(Bulk Synchronous Parallel)计算框架,弥补了Hadoop在计算能力上的不足
-
Hama的安装过程
见:Hama图计算模型_厦大数据库实验室博客 (xmu.edu.cn)