问题描述
我有要在Spark中加载到Dataframe的表,它具有以下架构:
verticesDf.printSchema
root
|-- id: integer (nullable = true)
|-- target: string (nullable = true)
|-- batch_id: integer (nullable = true)
|-- x: double (nullable = true)
|-- y: double (nullable = true)
|-- z: double (nullable = true)
|-- size: double (nullable = true)
如何将其转换为VertexRDD,以便以后可以使用它构建图?
我正在尝试以下操作:
case class SRow( target:String,batch_id:Double,x:Double,y:Double,z:Double,size:Double)
val sourceDS: Dataset[(VertexId,SRow)] = verticesDf.as[(VertexId,SRow)]
val vertVX=VertexRDD(sourceDS)
但是这和许多其他方法都没有给出结果-我总是遇到一些类型不匹配的情况。正确的方法是什么?
解决方法
至少,要创建一个图形,您需要两个RDD。类型RDD[(VertexId,VD)]
之一,包含顶点。 VertexId
只不过是Long
,VD
可以是任何东西,例如您的Srow
类。另一个RDD的类型为RDD[Edge[ED]]
,其中ED
类似于VD
可以是任何东西。
在这里,您将讨论vextex RDD的创建。您正在尝试将数据框转换为Dataset[(VertexId,SRow)]
类型的数据集。它不起作用有两个原因。 id
是一个整数而不是一个长整数,并且结构错误。您的数据框包含两列以上。
这是方法:
val vertices = verticesDf
.select(
// we transform the id to a long
'id cast "long",// we create a struct with the other columns that will be turned into a Srow
struct(verticesDf.columns.tail.map(col) : _*))
.as[(Long,SRow)]
// we also need edges,let's create a dummy RDD
val edges = sc.parallelize(Seq(Edge(1L,2L,"test")))
// And voila
val graph: Graph[SRow,String] = Graph(vertices.rdd,edges)
请注意,最后一行是根据RDD(而非数据集)创建图形的,因此我们需要对顶点进行转换。