如何正确创建具有节点和边属性的 GraphX

问题描述

我上Jupyter笔记本运行,使用的 spylon内核,Scala程序执行一个网络上的一些动作。

经过一些预处理后,我最终得到了两个 DataFrame,一个用于节点,一个用于边,如下所示:

对于节点

+---+--------------------+-------+--------+-----+
| id|                node|trip_id| stop_id| time|
+---+--------------------+-------+--------+-----+
|  0|0b29d98313189b650...| 209518|u0007405|56220|
|  1|45adb49a23257198e...| 209518|u0007409|56340|
|  2|fe5f4e2dc48b97f71...| 209518|u0007406|56460|
|  3|7b32330b6fe10b073...| 209518|u0007407|56580|
+---+--------------------+-------+--------+-----+
only showing top 4 rows

vertices_complete: org.apache.spark.sql.DataFrame = [id: bigint,node: string ... 3 more fields]

对于

+------+-----+----+------+------+---------+---------+--------+
|   src|  dst|time|method|weight|walk_time|wait_time|bus_time|
+------+-----+----+------+------+---------+---------+--------+
| 65465|52067|2640|  walk|2640.0|     1112|     1528|       0|
| 68744|52067|1740|  walk|1740.0|      981|      759|       0|
| 55916|52067|2700|  walk|2700.0|     1061|     1639|       0|
|124559|52067|1440|  walk|1440.0|     1061|      379|       0|
| 23036|52067|1800|  walk|1800.0|     1112|      688|       0|
+------+-----+----+------+------+---------+---------+--------+
only showing top 5 rows

edges_DF: org.apache.spark.sql.DataFrame = [src: bigint,dst: bigint ... 6 more fields]

我想创建图表对象出这一点,这样做的PageRank,找到最短路径等,因此我这些对象转换为RDD:

val verticesRDD : RDD[(VertexId,(String,Long,String,Long))] =  vertices_complete.rdd
    .map(row =>
        (row.getAs[Long](0),(row.getAs[String]("node"),row.getAs[Long]("trip_id"),row.getAs[String]("stop_id"),row.getAs[Long]("time"))))

val edgesRDD : RDD[Edge[Long]] = edges_DF.rdd
    .map(row => 
         Edge( 
         row.getAs[Long]("src"),row.getAs[Long]("dst"),row.getAs[Long]("weight")))

val my_graph = Graph(verticesRDD,edgesRDD)

任何操作,甚至可以是PageRank(也尝试了最短路径,错误仍然存​​在)

val ranks = my_graph.pageRank(0.0001).vertices

引发以下错误

org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 233.0 Failed 1 times,most recent failure: Lost task 5.0 in stage 233.0 (TID 9390,DESKTOP-A7EPMQG.mshome.net,executor driver): java.lang.classCastException

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndindependentStages(DAGScheduler.scala:2059)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2209)
  at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
  at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90)
  at org.apache.spark.graphx.pregel$.apply(pregel.scala:140)
  at org.apache.spark.graphx.lib.PageRank$.runUntilConvergenceWithOptions(PageRank.scala:431)
  at org.apache.spark.graphx.lib.PageRank$.runUntilConvergence(PageRank.scala:346)
  at org.apache.spark.graphx.GraphOps.pageRank(GraphOps.scala:380)
  ... 40 elided
Caused by: java.lang.classCastException

我觉得有什么不对与RDD对象的初始化(也是我想属性添加到边缘[时间,walk_time等..]也是如此,除了体重),但我不明白出如何正确地做到这一点。有什么帮助吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)