apache spark graphx-从sql表创建VertexRDD

问题描述

我有要在Spark中加载到Dataframe的表,它具有以下架构:

verticesDf.printSchema

root
 |-- id: integer (nullable = true)
 |-- target: string (nullable = true)
 |-- batch_id: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)
 |-- size: double (nullable = true)

如何将其转换为VertexRDD,以便以后可以使用它构建图?

我正在尝试以下操作:

case class SRow( target:String,batch_id:Double,x:Double,y:Double,z:Double,size:Double)
val sourceDS: Dataset[(VertexId,SRow)] = verticesDf.as[(VertexId,SRow)]
val vertVX=VertexRDD(sourceDS)

但是这和许多其他方法都没有给出结果-我总是遇到一些类型不匹配的情况。正确的方法是什么?

解决方法

至少,要创建一个图形,您需要两个RDD。类型RDD[(VertexId,VD)]之一,包含顶点。 VertexId只不过是LongVD可以是任何东西,例如您的Srow类。另一个RDD的类型为RDD[Edge[ED]],其中ED类似于VD可以是任何东西。

在这里,您将讨论vextex RDD的创建。您正在尝试将数据框转换为Dataset[(VertexId,SRow)]类型的数据集。它不起作用有两个原因。 id是一个整数而不是一个长整数,并且结构错误。您的数据框包含两列以上。

这是方法:

val vertices = verticesDf
    .select(
       // we transform the id to a long
       'id cast "long",// we create a struct with the other columns that will be turned into a Srow
       struct(verticesDf.columns.tail.map(col) : _*))
    .as[(Long,SRow)]

// we also need edges,let's create a dummy RDD
val edges = sc.parallelize(Seq(Edge(1L,2L,"test")))

// And voila
val graph: Graph[SRow,String] = Graph(vertices.rdd,edges)

请注意,最后一行是根据RDD(而非数据集)创建图形的,因此我们需要对顶点进行转换。