Spark 2.3:如何在迭代算法中从内存中释放RDD

问题描述

https://livebook.manning.com/book/spark-graphx-in-action/chapter-6/1获取示例代码

import org.apache.spark.graphx._

def dijkstra[VD](g:Graph[VD,Double],origin:VertexId) = {
    var g2 = g.mapVertices(
        (vid,vd) => (false,if (vid == origin) 0.0 else Double.MaxValue,List[VertexId]()))
    for (i <- 1L to g.vertices.count-1) {
        val currentVertexId =
            g2.vertices.filter(!_._2._1) 
                .fold((0L,(false,Double.MaxValue,List[VertexId]())))((a,b) =>
                    if (a._2._2 < b._2._2) a else b)
                ._1
    val newdistances = g2.aggregateMessages[(Double,List[VertexId])]( 
        ctx => if (ctx.srcId == currentVertexId) 
            ctx.sendToDst((
                ctx.srcAttr._2 + ctx.attr,ctx.srcAttr._3 :+ ctx.srcId)),(a,b) => if (a._1 < b._1) a else b)
    g2 = g2.outerJoinVertices(newdistances)((vid,vd,newSum) => {
        val newSumVal =
            newSum.getorElse((Double.MaxValue,List[VertexId]())) 
        (vd._1 || vid == currentVertexId,math.min(vd._2,newSumVal._1),if (vd._2 < newSumVal._1) vd._3 else newSumVal._2)})
    }
    g.outerJoinVertices(g2.vertices)((vid,dist) =>
        (vd,dist.getorElse((false,List[VertexId]())).productIterator.toList.tail))
}

val myVertices = spark.sparkContext.makeRDD(Array((1L,"A"),(2L,"B"),(3L,"C"),(4L,"D"),(5L,"E"),(6L,"F"),(7L,"G")))
val myEdges = spark.sparkContext.makeRDD(Array(Edge(1L,2L,7.0),Edge(1L,4L,5.0),Edge(2L,3L,8.0),9.0),5L,Edge(3L,Edge(4L,15.0),6L,6.0),Edge(5L,7L,Edge(6L,11.0)))
val myGraph = Graph(myVertices,myEdges)

val result = dijkstra(myGraph,1L)

result.vertices.map(_._2).collect

每次我运行这段代码时,VertexRDD 都会留在内存中,我无法释放它。

enter image description here

看起来 GraphX 正在缓存图形数据,即使它没有在代码中指定。是否可以从内存中释放上一次运行的 RDD 数据?

我试图通过做result.unpersist()result.vertices.unpersist()result.edges.unpersist()甚至result.checkpoint()来不坚持。

最终,我想在 for 循环中运行代码以找到不同 origin 的多个结果,除非我能弄清楚如何从以前释放 RDD,否则我会遇到内存问题。

更新: 我想出了一个蛮力方法来清除所有 VertexRDD 和 EdgeRDD

for ((k,v) <- spark.sparkContext.getPersistentRDDs) {
  val convertedToString = v.toString()
  if (convertedToString.contains("VertexRDD") || convertedToString.contains("EdgeRDD")) {
      v.unpersist()
  }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)