问题描述
我上Jupyter笔记本运行,使用的 spylon内核,Scala程序执行一个网络上的一些动作。
经过一些预处理后,我最终得到了两个 DataFrame,一个用于节点,一个用于边,如下所示:
对于节点
+---+--------------------+-------+--------+-----+
| id| node|trip_id| stop_id| time|
+---+--------------------+-------+--------+-----+
| 0|0b29d98313189b650...| 209518|u0007405|56220|
| 1|45adb49a23257198e...| 209518|u0007409|56340|
| 2|fe5f4e2dc48b97f71...| 209518|u0007406|56460|
| 3|7b32330b6fe10b073...| 209518|u0007407|56580|
+---+--------------------+-------+--------+-----+
only showing top 4 rows
vertices_complete: org.apache.spark.sql.DataFrame = [id: bigint,node: string ... 3 more fields]
对于边
+------+-----+----+------+------+---------+---------+--------+
| src| dst|time|method|weight|walk_time|wait_time|bus_time|
+------+-----+----+------+------+---------+---------+--------+
| 65465|52067|2640| walk|2640.0| 1112| 1528| 0|
| 68744|52067|1740| walk|1740.0| 981| 759| 0|
| 55916|52067|2700| walk|2700.0| 1061| 1639| 0|
|124559|52067|1440| walk|1440.0| 1061| 379| 0|
| 23036|52067|1800| walk|1800.0| 1112| 688| 0|
+------+-----+----+------+------+---------+---------+--------+
only showing top 5 rows
edges_DF: org.apache.spark.sql.DataFrame = [src: bigint,dst: bigint ... 6 more fields]
我想创建图表对象出这一点,这样做的PageRank,找到最短路径等,因此我这些对象转换为RDD:
val verticesRDD : RDD[(VertexId,(String,Long,String,Long))] = vertices_complete.rdd
.map(row =>
(row.getAs[Long](0),(row.getAs[String]("node"),row.getAs[Long]("trip_id"),row.getAs[String]("stop_id"),row.getAs[Long]("time"))))
val edgesRDD : RDD[Edge[Long]] = edges_DF.rdd
.map(row =>
Edge(
row.getAs[Long]("src"),row.getAs[Long]("dst"),row.getAs[Long]("weight")))
val my_graph = Graph(verticesRDD,edgesRDD)
任何操作,甚至可以是PageRank(也尝试了最短路径,错误仍然存在)
val ranks = my_graph.pageRank(0.0001).vertices
引发以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 233.0 Failed 1 times,most recent failure: Lost task 5.0 in stage 233.0 (TID 9390,DESKTOP-A7EPMQG.mshome.net,executor driver): java.lang.classCastException
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndindependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2209)
at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:90)
at org.apache.spark.graphx.pregel$.apply(pregel.scala:140)
at org.apache.spark.graphx.lib.PageRank$.runUntilConvergenceWithOptions(PageRank.scala:431)
at org.apache.spark.graphx.lib.PageRank$.runUntilConvergence(PageRank.scala:346)
at org.apache.spark.graphx.GraphOps.pageRank(GraphOps.scala:380)
... 40 elided
Caused by: java.lang.classCastException
我觉得有什么不对与RDD对象的初始化(也是我想属性添加到边缘[时间,walk_time等..]也是如此,除了体重),但我不明白出如何正确地做到这一点。有什么帮助吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)