

Graphx 集成了shortestpath 最短路径算法,具体采用的是迪杰斯特拉算法,引用库为:org.apache.spark.graphx.lib.ShortestPaths。该算法用于计算图中所有的到目标点(点集)的距离。


val landmarks = Seq(1, 4).map(_.toLong)
val results = ShortestPaths.run(graph, landmarks)

(1)landmarks 是需要被计算与 graph 所有节点距离的节点(作为目标点)。
(2)landmarks 里的节点实际上也是 graph 中的节点,只是节点 id 被抽了出来。
(3)Seq 说明 landmarks 是一个集合。







假设有一份图数据 G,它的节点为 V ,目标节点集合为 LANDMARKS,如上图所示,LANDMARKS={1,4}
(1)为每一个V赋予一个 Map,来存储它与节点 T(LANDMARKS中的节点)的距离,如果该节点也存在在LANDMARKS中,则初始值为0,否则赋予一个空Map。
(2)将G中的点全部激活(上图为红色),然后所有的 v 会同时以自身作为出发点,探索整个G,来填充自己的 Map。

(1)针对所有处于激活态的节点做处理,实际上是针对所有S或者T处于激活状态的 S->T (点-边->点)元组。
(2)对于每个S->T元组,先获取T中的Map(简写为T(Map)),将 T(Map)中的value+1后与S中的Map(简写为S(Map))做一次Merge,如果Merge的结果与S原本的结果相等,则将T(Map)作为消息传递给S。否则,则不发生消息传递


package com.edata.bigdata.algorithm.networks

import org.apache.spark.graphx.{EdgeTriplet, Graph, Pregel, VertexId}

import scala.reflect.ClassTag

 * @author: Alan Sword
 * @description: Compute node connectivity between all pairs(or partly) of nodes.

object AllPairNodeConnectivity extends Serializable {

   * @description: A Map definition that used to create the Map attributes for each vertex
  type APNCMap = Map[VertexId, Int]

   * @param x : element that used to create a APNCMap
   * @description: update the apncmap by adding 1 to each element
   * @return : APNCMap type object
  private def makeAPNCMap(x: (VertexId, Int)*) = Map(x: _*)

   * @param apncmap :the map attributes that needed to update
   * @description: update the apncmap by adding 1 to each element
   * @return apncmap
  private def updateAPNCMap(apncmap: APNCMap): APNCMap = apncmap.map { case (v, d) => v -> (d + 1) }

   * @param apncmap1 :the first APNCMap that needed to merge
   * @param apncmap2 :the second APNCMap that needed to merge
   * @description: merge two key set,and then merge two APNCMap by choosing the smaller of two elements with the same key
   * @return
  private def mergeAPNCMap(apncmap1: APNCMap, apncmap2: APNCMap): APNCMap = {
    (apncmap1.keySet ++ apncmap2.keySet).map {
      k => k -> math.min(apncmap1.getOrElse(k, Int.MaxValue), apncmap2.getOrElse(k, Int.MaxValue))

   * @param id   : vertex id
   * @param attr : the APNCMap's attributes of vertex
   * @param msg  : the message received by the vertex
   * @description: this function will be called when a vertex receive a message, and it will merge the vertex's original attributes and APNCMap-type message
   * @return
  def vertexProgram(id: VertexId, attr: APNCMap, msg: APNCMap): APNCMap = {
    mergeAPNCMap(attr, msg)

   * @param edge : the triple ( S->T ) in graph
   * @description: call updateAPNCMap with T's attributes as the argument,and then call mergeAPNCMap with its result and S's attrubutes as arguments
   * @return
  def sendMessage(edge: EdgeTriplet[APNCMap, _]): Iterator[(VertexId, APNCMap)] = {
    val newAttr = updateAPNCMap(edge.dstAttr)
    if (edge.srcAttr != mergeAPNCMap(newAttr, edge.srcAttr))
      Iterator((edge.srcId, newAttr))

   * @param graph     : All the vertexs in graph will be taken as the starting vertexs
   * @param landmarks : All the vertexs in landmarks will be taken as the target vertexs
   * @tparam VD :the type of vertex's attributes
   * @tparam ED :the type of edge's attributes
   * @description: The main running method,including several steps:
   *               1.Initialization,initialize the APNCMap attributes for each vertex & activate all the vertex.
   *               2.Sending Message,for each triple that contain active vertex,determine whether a message needs to be send,if not,inactivate the related vertex.
   *               3.Receiving Message,recieve message & active the related vertex.
   * @return
  def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[APNCMap, ED] = {
    //initialization,initialize the APNCMap attributes for each vertex & active all the vertex
    val APNCGraph = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) makeAPNCMap(vid -> 0) else makeAPNCMap()
    // all the vertex will receive this message, and be activated
    val initialMessage = makeAPNCMap()
    //for each triple that contain active vertex,determine whether a message needs to be send,if not,inactivate the related vertex
    //recieve message & active the related vertex
    Pregel(APNCGraph, initialMessage)(vertexProgram, sendMessage, mergeAPNCMap)


