图数据的存储结构
键值对存储因具有可扩展强、结构简单、查找迅速等特点被广泛应用于图查询系统中,如Wukong、Trinity.RDF。在Wukong系统中,图上的边会转换成键值对进行存储,将顶点编号、边的类型、边的方向、值的地址和大小等信息组合成键(key),对应邻居顶点构成值(value),如图1所示。当需要查询顶点1、边类型为2的所有入边(in)时,先通过Hash函数找到对应的键的存储位置,然后根据键得到值的存储地址(offset),最后再通过远端或者本地访问的方式获取值的信息,即对应的邻居有顶点8和顶点9。
图1 键值对存储
在图计算系统中广泛使用压缩稀疏矩阵来存储图的结构,如图2所示,包括GraphLab、PowerGraph、Gemini等系统。行压缩稀疏矩阵(compressed sparse row,CSR)表示出边的信息,列压缩稀疏矩阵(compressed sparse column,CSC)表示入边的信息。顶点索引(vertex index)记录了每个顶点在边数组中的起始位置,并且顶点编号与顶点索引数组的序号保持一致。如顶点2,在顶点索引中的值为4,则顶点2的邻居顶点从边数组中下标为4的元素开始,一直到下一个顶点对应的索引值6,也就是说顶点1、顶点3是顶点2的邻居顶点。若该结构为CSC,则(1,2)和(3,2)是原图中的边;若为CSR,则(2,1)和(2,3)为原图中的边。压缩稀疏矩阵的图存储方式对于遍历图上所有边的计算而言是高效的。
图计算系统的图划分和执行模式
在图计算系统中,图划分在减少数据跨机器通信、负载均衡等方面发挥着很重要的作用。目前的划分方式可以分为边划分(edge-cut)和点划分(vertex-cut),如图3所示。
边划分是指图从边切开,每个顶点被放置在一台服务器上(通常通过Hash的方式),也就是该顶点对应的边信息都存储在该机器上,其他服务器上只有该顶点的镜像顶点,因此每条边会在多台机器上出现。边划分的优点是计算过程中对邻居顶点信息的聚集都可以在本地完成;缺点是对于幂律分布的图,会出现负载不均衡的问题。幂律分布的图的特点是少部分的点拥有大量的边,因此拥有着这些点的机器的信息计算和通信开销会远大于其他的机器。点划分是将每条边唯一放置在一台机器上,顶点可能会被切分在不同的机器中。点划分的优点是对于幂律分布的图也能实现很好的负载均衡。但是存在的问题是,在计算的过程中,由于一个顶点被切分在不同服务器上,则聚合邻居顶点的信息需要进行跨机器通信。还有一些工作是将点划分和边划分的方法相互结合,为图上不同的顶点提供不同的划分方法。
图计算引擎的实现通常有两种方式:基于推送(push-based)模式和基于拉取(pull-based)模式。基于推送模式是对源顶点进行遍历,然后源顶点将自身的状态通过出边更新邻居顶点的状态。相反地,基于拉取模式是对目标顶点进行遍历,通过入边拉取邻居顶点的状态更新自己。相比于基于推送模式的更新邻居顶点(写)操作,基于拉取模式的引擎只需要拉取邻居顶点的信息(读)即可,因此其能够达到更高的计算吞吐率。基于推送模式比较适合图中活跃顶点较少的算法,可以方便地跳过该轮迭代中没有活跃的顶点,减少计算量。同时也有系统混合使用了两种更新方式,在执行的过程中动态地选择适合的更新模式,如Gemini、Polymer等系统。
基于拉取模式的消息传递
图计算引擎的消息传递模式与图的划分方式有很大的关系,因为图数据划分的模式影响了顶点收集邻居顶点的消息来更新自己的方式。在Wukong中,键值对的存储模式事实上是一种边划分的方式,即每一个顶点只属于一台服务器,在其他服务器上的只是它的镜像顶点。
根据图查询系统的数据划分特点,本文使用基于拉取模式的消息传递,类似于Ligra、Polymer等系统中使用的pull模式。在每轮迭代中主要分为两个步骤进行,如图1所示。
步骤1 每台服务器上的顶点拉取其入边顶点的消息来更新自身的值。例如顶点2通过入边信息,聚合邻居顶点1、顶点3的值,然后更新自己的值。
步骤2 每台服务器上的顶点会将步骤1中更新的值发送给其他机器,更新其镜像顶点的值,到此一轮迭代的计算完成。例如服务器0上的顶点2、顶点4会发送信息给服务器1,以更新服务器1上顶点2、顶点4的镜像顶点的值。
在图查询系统Wukong中选择拉取模式而不是推送模式,是由其数据的存储模式决定的。因为每台服务器存储的信息是主顶点(master)聚集起来的,如果选择推送的模式,则每个顶点需要发送信息更新它的出边邻居顶点,发送的消息数量为O(E)(E表示边的数量,发送消息的数量与边的数量成正比)。
例如服务器0上的顶点2需要更新服务器1上的顶点1、顶点3,因为服务器1上没有顶点2的邻居信息(只能通过顶点1、顶点3访问顶点2,不能通过顶点2访问顶点1、顶点3),因此服务器0需要发送两条信息,分别更新服务器1上的顶点1和顶点3。
而在拉取模式下,顶点的聚合操作都是在本地进行的,不同服务器间只需要进行主顶点和镜像顶点的通信即可,消息发送数量由O(E)减少为O(V)(V表示顶点的数量)。
同时,对于不同机器间的顶点更新,本文采用了批量更新(batch)的方法,以减少单次数据更新的开销。批量更新是指将需要更新的顶点数据聚集在一起,然后一次性发送给其他的机器进行更新,而不是每个顶点单独发送一条更新消息。
批量更新的方法虽然增加了单次数据发送的时间,但是大大地降低了数据发送的次数,因此平均下来每一条数据的传播时间被极大地缩短。
负载均衡
负载均衡是分布式并行计算系统一个重要的研究方向。对于一个同步的图计算引擎来说,计算的时间取决于最慢的机器的执行时间。其中,同步的图计算引擎是指新一轮迭代的开始需要等待所有的点完成上一轮迭代。因此,不同机器间以及单个机器中不同线程间的计算任务需要尽可能均衡。不同机器间的负载均衡由图的划分来保证,本文主要关注单台机器上不同线程间的负载均衡问题。针对该问题,笔者提出两个优化方案:基于边数量的任务划分和任务窃取。
基于边数量的任务划分方法是基于Grazelle系统中的思想提出的,指依据边的数量为每个线程划分负责的点的数量。拉取引擎的计算过程包括两层循环,外层循环对所有目标顶点进行遍历,内层循环对每个目标顶点通过入边聚集源顶点的信息。不同的系统通常在外层循环中使用并行方法进行优化,即每个线程负责不同的目标顶点的计算。一种简单的划分策略是按照外层循环的顶点数量进行划分,但不同顶点对应边的数量不一致,这可能导致不同线程的计算量差异较大。因此,本文基于边数量预先为每个线程分配好需要负责的顶点。如图7所示,将下面的计算任务划分给两个线程,线程0负责0号顶点,线程1负责1~5号顶点,每个线程中的计算都包含了7条边。如果使用基于点的数量的任务划分方法,则线程1负责0~2号顶点,一共10条边,而线程2负责3~5号顶点,一共4条边,会出现负载不均衡的问题。
任务窃取技术被广泛应用在分布式并行系统中,它让已经完成任务的线程“窃取”其他线程未完成的任务来执行。在本系统中其可以与基于边数量的任务划分技术共同使用,具体实现如下:首先每个线程维护一个任务队列;然后将被分配好的任务划分成更多的子任务,保存在各自的任务队列里;最后每个线程从各自的任务队列里获取子任务并执行,当任务队列为空时,检查旁边线程的任务队列,“窃取”其他线程的任务来执行