如何使用 GraphX 的 Java API 获取图的连接组件列表

问题描述

我对 spark 和 GraphX 还很陌生,我正在尝试了解如何使用 GraphX 的 Java API 执行以下操作。我正在寻找一种具有以下签名的方法

private <List<Graph<VD,ED>> computeConnectedComponents(Graph<VD,ED> graph){}

其中,给定一个只有正度节点的图,但连接组件的数量未知,它应该返回一个图的列表(顺序无关紧要),其中每个图都是连接的。

我知道 GraphOps.connectedComponents()ConnectedComponents.run(),但我很难理解返回值。文档将它们列为返回 Graph<Object,ED> 的图,并说明返回的“最低顶点 ID”。

基本上,我想知道如何从 connectedComponents 的返回值和我的初始图导出这个图列表。

解决方法

下面的代码是在 Scala 中的,但应该展示这个想法。

返回的图会包含所有的顶点,但是每个顶点的属性被替换为一个VertexId(实际上只是一个Long),可以解释为该顶点所属的连通分量的id。它也是属于那个连通分量的“最低顶点 id”。

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val vertexArray = Array(
  (1L,("A",28)),(2L,("B",27)),(3L,("C",65)),(4L,("D",42)),(5L,("E",55)),(6L,("F",50)),(7L,("G",53)),(8L,("H",66))
  )

// Vertices 1 - 6 are connected,7 and 8 are connected.
val edgeArray = Array(
  Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),6L,3),Edge(4L,1),Edge(5L,3L,8),Edge(7L,8L,3)
  )

val vertexRDD: RDD[(Long,(String,Int))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(String,Int),Int] = Graph(vertexRDD,edgeRDD)

val cc = graph.connectedComponents().vertices.collectAsMap()
cc.foreach {
  case (vertexId,clusterId) =>
    println(s"Vertex $vertexId belongs to cluster $clusterId")
}

输出:

Vertex 8 belongs to cluster 7
Vertex 2 belongs to cluster 1
Vertex 5 belongs to cluster 1
Vertex 4 belongs to cluster 1
Vertex 7 belongs to cluster 7
Vertex 1 belongs to cluster 1
Vertex 3 belongs to cluster 1
Vertex 6 belongs to cluster 1