前言
中介中心性(Between Centrality),或者叫介数中心性,是基于最短路径对关系图谱中节点的中心性进行测量的典型图论算法。和其它的图论中心性算法一样,中介中心性用来衡量社会关系网络中,个人、企业或者其它的实体在整个网络中的重要程度。例如,在一个洗钱犯罪团伙的交易关系网络中,中介中心性高的账号,有可能是隶属于“庄主”的高风险账号,因为大量的资金都是通过这些账号进行归集或者拆分出去的。又例如,在交通运输领域的运用上,中介中心性不仅可以用于预测交通堵塞情况,也可以用于对城市道路运输效率进行分析。同时,又因为中介中心性算法的时间复杂度很高,所以在实际的工程化落地过程中也充满了挑战。
参考资料
[1] Freeman, Linton (1977). “A set of measures of centrality based on betweenness”. Sociometry. 40 (1): 35–41. https://www.jstor.org/stable/3033543?origin=crossref
[2] Brandes, Ulrik (2001). “A faster algorithm for betweenness centrality” . Journal of Mathematical Sociology. 25 (2): 163–177. https://pdodds.w3.uvm.edu/research/papers/others/2001/brandes2001a.pdf
[3] Ulrik Brandes and Christian Pich."Centrality Estimation in Large Networks. " International Journal of Bifurcation and Chaos 17(7):2303-2318, 2007. https://dx.doi.org/10.1142/S0218127407018403
[4] Betweenness centrality. https://en.wikipedia.org/wiki/Betweenness_centrality#CITEREFFreeman1977
中介中心性的定义
中介中心性的定义最早是Freeman提出的,一个节点 v v v的中介中心性由下面的公式给出:
C B ( v ) = ∑ s ≠ v ≠ t σ s t ( v ) σ s t ( 1 ) C_B(v)=\sum\limits_{s \neq v \neq t} \frac{\sigma_{st}(v)}{\sigma_{st}}\qquad\qquad (1) CB(v)=s=v=t∑σstσst(v)(1)
其中, σ s t \sigma_{st} σst指的是从节点 s s s到节点 t t t的所有最短路径的总数,而 σ s t ( v ) \sigma_{st}(v) σst(v)则是代码这些从节点 s s s到 t t t的最短路径中,经过节点 v v v的那部分最短路径的数量。需要注意的是,节点 v v v不能是最短路径的起点或者终点,也就是 s ≠ v ≠ t s \neq v \neq t s=v=t,因为这里的要求是“经过”。总结一句话就是,一个节点的中介中心性就是,其它任意两个节点之间的所有最短路径经过该节点的次数。中介中心性测量了某个节点在多大程度上能成为其它节点之间的“中介”,以此来衡量该节点在整个网络中的重要程度。
从上述对中介中心性的定义来看,是需要计算全图节点两两之间的最短路径数,其计算复杂度达到了 Θ ( ∣ V 3 ∣ ) \Theta(|V^3|) Θ(∣V3∣), 这同时也是一个典型的单源最短路径计算问题(single-source shortest-paths)。
一种用于中介中心性计算的快速算法
2021年,Brandes在他的论文《A Faster Algorithm for Betweenness Centrality》中提出了中介中心性算法的一种实现,将原本 Θ ( ∣ V 3 ∣ ) \Theta(|V^3|) Θ(∣V3∣)的时间复杂度降为 Θ ( ∣ V ∣ ∣ E ∣ ) \Theta(|V||E|) Θ(∣V∣∣E∣),其中 ∣ E ∣ |E| ∣E∣为全图边的总数量。Brandes因为其良好的性能,在许多实际应用中都得到了广泛的使用。
接下来,我们来看看Brandes算法如何实现。
首先, δ s t ( v ) = σ s t ( v ) σ s t \delta_{st}(v)= \frac{\sigma_{st}(v)}{\sigma_{st}} δst(v)=σstσst(v)表示节点 s s s和节点 t t t之间的所有最短路径中,经过节点 v v v的最短路径的比例。所以,显而易见,节点 v v v的中介中心性可以表示为:
C B ( v ) = ∑ s ≠ v ≠ t ∈ V δ s t ( v ) C_B(v)=\sum\limits_{s \neq v \neq t \in V}\delta_{st}(v) CB(v)=s=v=t∈V∑δst(v)
接着,我们需要再引入三个基础定义,
定义1:
d
G
(
s
,
t
)
d_G(s,t)
dG(s,t) 表示从节点
s
s
s到
t
t
t的最短路径的距离。有且只有
d
G
(
s
,
t
)
=
d
G
(
s
,
v
)
+
d
G
(
v
,
t
)
d_G(s,t)=d_G(s,v)+d_G(v,t)
dG(s,t)=dG(s,v)+dG(v,t)成立时,我们才认为有一个节点
v
v
v在节点
s
s
s和
t
t
t之间的最短路径上。
定义2:
节点
v
v
v在以
s
s
s为源点的最短路径上时,节点
v
v
v的前序节点。这些前序节点的集合定义如下:
P
s
(
v
)
=
{
u
∈
V
:
{
u
,
v
}
∈
E
,
d
G
(
s
,
v
)
=
d
G
(
s
,
u
)
+
w
(
u
,
v
)
}
P_s(v)=\{u \in V :\{u,v\} \in E, d_G(s,v)=d_G(s,u)+w(u,v)\}
Ps(v)={u∈V:{u,v}∈E,dG(s,v)=dG(s,u)+w(u,v)}
定义3:
δ
s
∗
(
v
)
=
∑
t
∈
V
δ
s
t
(
v
)
\delta_{s*}(v)= \sum\limits_{t \in V}\delta_{st}(v)
δs∗(v)=t∈V∑δst(v),
这里表示的是以
s
s
s为源点的所有节点对之间的最短路径经过节点
v
v
v的总比例。这是一个递归公式,它也是后续Brandes算法实现最短路径数快速累加的关键。
有了以上的基础概念后,我们假设当节点
s
s
s到任意节点
t
t
t之间的最短路径都有且只有一条,那么
δ
s
t
(
v
)
\delta_{st}(v)
δst(v)的结果不是1,就是0。故我们可以得到以下公式:
δ
s
∗
(
v
)
=
∑
w
:
v
∈
P
s
(
w
)
(
1
+
δ
s
∗
(
w
)
)
(
2
)
\delta_{s*}(v)= \sum\limits_{w:v\in P_s(w)}(1 + \delta_{s*}(w))\qquad\qquad (2)
δs∗(v)=w:v∈Ps(w)∑(1+δs∗(w))(2)
接下来,将公式(2)推广到一般形式,也就是节点
s
s
s到节点
t
t
t之间存在多条最短路径,
且有
σ
s
v
σ
s
w
∗
σ
s
t
(
w
)
\frac{\sigma_{sv}}{\sigma_{sw}} * \sigma_{st}(w)
σswσsv∗σst(w) 表示从节点
s
s
s到任意
t
≠
w
t \neq w
t=w的节点的所有最短路径中,包含了节点
w
w
w, 节点
v
v
v以及边
{
v
,
w
}
\{v,w\}
{v,w}的最短路径数量,其中
w
w
w满足
v
∈
P
s
(
w
)
v\in P_s(w)
v∈Ps(w)。
那么可以得到以下公式:
δ s ∗ ( v ) = ∑ w : v ∈ P s ( w ) σ s v σ s w ( 1 + δ s ∗ ( w ) ) ( 3 ) \delta_{s*}(v)= \sum\limits_{w:v\in P_s(w)} \frac{\sigma_{sv}}{\sigma_{sw}}(1 + \delta_{s*}(w))\qquad\qquad (3) δs∗(v)=w:v∈Ps(w)∑σswσsv(1+δs∗(w))(3)
因为公式(3)的证明较为繁琐,所以这里就不再说明,感兴趣的同学可以自己去看原论文。
所以,Brandes算法实际上就是先使用BFS对每个节点做最短路径的计算,计算每个节点到其它节点的最短路径数量,然后同时记录了每个节点的前驱节点;接下来就是通过公式(3)在BFS树上递推计算得到每个节点的中介中心性。
其它基于Brandes算法的中介中心性算法实现
正如之前提到的,Brandes算法因为其优异的性能,其也被大量的开源框架所运用。例如经典的开源Python图计算框架NetworkX,其内部的中介中心性算法也是基于Brandes算法实现。虽然Brandes算法将计算复杂度下降到了
Θ
(
∣
V
∣
∣
E
∣
)
\Theta(|V||E|)
Θ(∣V∣∣E∣),但在对上亿的图数据进行中介中心性计算时,因为单源最短路径计算的效率问题,使得其在工程化落地时同样还是充满了挑战。
因此,NetworkX框架依据《Centrality Estimation in Large Networks》这篇论文中所做的实验,以及论文中提到的Hoeffding不等式的理论依据,提出了基于节点度数,节点和节点之间最大最短路径等多种采样方式,来达到近似计算中介中心性的目的。另外,在基于Spark的开源SparklingGraph图计算框架中,则是参考了Edmonds的《A Space-Efficient Parallel Algorithm for Computing Betweenness Centrality in Distributed Memory》这一篇论文,引入successor set来加快对最短路径累加计算的效率。
中介中心性算法源码解读
接下来,我们将从一个开源框架 SparklingGraph去学习如何用代码实现中介中心性的计算。因为我们平时在处理工业界的图数据时,都是上亿的节点和边,所以这也是为什么去选择一个分布式的图计算框架来讲解。
GitHub : https://github.com/sparkling-graph/sparkling-graph
因为该框架是基于Spark Graphx去实现的图计算方法,所以Louvain算法的开发语言是Scala。这里建议大家具备一些Spark和Scala的基础知识,不然下面的源码解读比较难以理解。
Spark Graphx的官网: https://spark.apache.org/docs/latest/graphx-programming-guide.html
另外,下面的讲解中,提到的代码行数对应的是源码文件中实际的代码行数。
首先,我们进入中介中心度计算的主类,EdmondsBC.scala中的computeBC方法就是中介中心度的核心。该方法主要做了三件事:
Step1. 遍历全图所有节点,并以一个节点作为Source节点,然后使用BFS算法计算以Source节点为开始节点到其它节点的最短路径数量
σ
s
∗
{\sigma_{s*}}
σs∗,相当于是SSSP的计算。
Step2. 根据步骤1计算的最短路径数以及上文的公式(3),从最短路径深度最深的节点开始反推迭代计算每个节点的中介中心性。
Step3. 更新图上所有节点的中介中心性,接着以另一个新的节点作为新的Source节点,重复步骤1和步骤2,并继续更新累加节点的中介中心性。
def computeBC = {
val verticesIds = simpleGraph.vertices.map({ case (vertexId, _) => vertexId }).cache
val verticesIterator = verticesIds.toLocalIterator
var betweennessVector: VertexRDD[Double] = simpleGraph.vertices.mapValues(_ => .0).cache()
verticesIterator.foreach(processedVertex => {
val bfsSP = edmondsBFSProcessor.computeSingleSelectedSourceBFS(simpleGraph, processedVertex)
val computedGraph = bcAggregator.aggregate(bfsSP, processedVertex)
val partialBetweennessVector = computedGraph.vertices.mapValues(_.bc)
val previousBetweennessVector = betweennessVector
betweennessVector = updateBC(betweennessVector, partialBetweennessVector)
// betweennessVector.checkpoint()
betweennessVector.count
previousBetweennessVector.unpersist(false)
bfsSP.unpersist(false)
computedGraph.unpersistVertices(false)
computedGraph.edges.unpersist(false)
})
verticesIds.unpersist(false)
finalize(betweennessVector)
}
step1
我们可以看到下面的这一行代码就是在执行step1计算单源最短路径的任务。
val bfsSP = edmondsBFSProcessor.computeSingleSelectedSourceBFS(simpleGraph, processedVertex)
在下面的代码块中,我们可以发现这里使用的是GraphX框架的pregel消息传播机制来从Source节点向周围相同深度的节点迭代地发送消息。
def computeSingleSelectedSourceBFS(graph: Graph[VD, ED], source: VertexId): Graph[VD, ED] = {
val initGraph = graph.mapVertices((vId, attr) => vPredicate.getInitialData(vId, attr)(source)).cache
val result = initGraph.ops.pregel[MD](processor.initialMessage)(
vPredicate.applyMessages,
processor.sendMessage,
processor.mergeMessages
)
initGraph.unpersist(false)
result
}
通过下面的sendMessage和mergeMessage两个代码段的解读,我们可以知道在传递消息时,记录了并更新了当前节点最短路径的前序节点,当前节点的最短路径距离 d G ( s , v ) d_G(s,v) dG(s,v), 以及当前节点的最短路径数量 σ s v \sigma_{sv} σsv
override def sendMessage(triplet: EdgeTriplet[EdmondsVertex, ED]): Iterator[(VertexId, EdmondsMessage)] = {
def msgIterator(currentVertexId: VertexId) = {
val othAttr = triplet.otherVertexAttr(currentVertexId)
val thisAttr = triplet.vertexAttr(currentVertexId)
if (othAttr.explored) Iterator.empty else Iterator((triplet.otherVertexId(currentVertexId), EdmondsMessage(List(currentVertexId), thisAttr.sigma, thisAttr.depth + 1)))
}
def hasParent(source: VertexId) = triplet.vertexAttr(source).explored
val srcMsg = if (hasParent(triplet.srcId)) msgIterator(triplet.srcId) else Iterator.empty
val dstMsg = if (hasParent(triplet.dstId)) msgIterator(triplet.dstId) else Iterator.empty
srcMsg ++ dstMsg
}
def merge(other: EdmondsMessage): EdmondsMessage = {
require(depth == other.depth)
EdmondsMessage(preds ++ other.preds, sigma + other.sigma, depth)
}
step2
接下来我们回到EdmondsBC.scala中的computeBC方法,下面的代码就是用我们之前step1计算出来的SSSP结果执行step2,更新每个节点的中介中心性。
val computedGraph = bcAggregator.aggregate(bfsSP, processedVertex)
我们再进入到EdmondsBCAggregator.scala中的aggregate方法体中,如下面的代码片段所示,其实内容很简单。首先,使用aggregate方法获得最长的最短路径距离maxDepth, 然后根据maxDepth找到对应的相同最短路径距离节点(depth == maxDepth),因为这些节点没有任何的后续节点,也就是 δ s ∗ ( w ) \delta_{s*}(w) δs∗(w)为0。接着就可以根据公式(3)以及拥有最大最短路径距离的节点的前序节点,反推和计算每个节点的中介中心性。整个反推计算过程就是这段“for (i <- 1 until maxDepth reverse) ” 代码所体现的,从后往前找到最短路径距离依次减少的节点进行累加计算。
def aggregate(graph: Graph[EdmondsVertex, ED], source: VertexId) = {
val maxDepth = graph.vertices.aggregate(0)({ case (depth, (vId, vData)) => Math.max(vData.depth, depth) }, Math.max)
var g = graph
var oldGraph: Option[Graph[EdmondsVertex, ED]] = None
var messages = aggregateMessages(g, maxDepth).cache
messages.count
for (i <- 1 until maxDepth reverse) {
oldGraph = Some(g)
g = applyMessages(g, messages).cache
val oldMessages = messages
messages = aggregateMessages(g, i).cache
messages.count
oldMessages.unpersist(false)
oldGraph.foreach(_.unpersistVertices(false))
oldGraph.foreach(_.edges.unpersist(false))
}
messages.unpersist(false)
g
}
接着我们进入到累加计算的方法aggregateMessages中,整个逻辑其实也就是找到符合当前迭代的最短路径距离的节点,然后根据公式(3)对该节点进行中介中心性的计算(delta),并通过graphx的消息传递机制将delta的值传递给前序节点进行更新。
private def aggregateMessages(graph: Graph[EdmondsVertex, ED], depth: Int) = graph.aggregateMessages[Double](
edgeContext => {
val sender = createAndSendMessage(edgeContext.toEdgeTriplet, depth) _
sender(edgeContext.srcId, edgeContext.sendToDst)
sender(edgeContext.dstId, edgeContext.sendToSrc)
}, _ + _
)
private def createAndSendMessage(triplet: EdgeTriplet[EdmondsVertex, ED], depth: Int)(source: VertexId, f: (Double) => Unit) = {
val attr = triplet.vertexAttr(source)
if (attr.depth == depth) sendMessage(produceMessage(triplet)(source), f)
}
private def produceMessage(triplet: EdgeTriplet[EdmondsVertex, ED])(source: VertexId) = {
val attr = triplet.vertexAttr(source)
val otherAttr = triplet.otherVertexAttr(source)
val delta = (otherAttr.sigma.toDouble / attr.sigma.toDouble) * (1.0 + attr.delta)
if (attr.preds.contains(triplet.otherVertexId(source))) Some(delta) else None
}
private def sendMessage(message: Option[Double], f: (Double) => Unit) = message.foreach(f)
private def applyMessages(graph: Graph[EdmondsVertex, ED], messages: VertexRDD[Double]) =
graph.ops.joinVertices(messages)((vertexId, attr, delta) => {
EdmondsVertex(attr.preds, attr.sigma, attr.depth, delta, delta)
})
step3
步骤3实际上就是EdmondsBC.scala中的computeBC方法里,循环计算所有节点的单源最短路径,然后累加更新节点中介中心性的逻辑。具体更新代码在updateBC方法里,这里比较简单就不展开讲了。
至此,中介中心性算法的原理和源码已经全部介绍完了,我们可以看到Spark的GraphX本质还是MapReduce的计算框架,无论在进行单源最短路径的计算,还是对于中介中心性的反推累加计算,都需要大量的对全图节点进行Map的操作,性能瓶颈也很大,所以还是建议涉及到SSSP的图算法,能不用GraphX框架就尽量不要用。