Pregel API - 为什么小图上的迭代会消耗这么多内存?

问题描述

我对 Spark 和 Scala 比较陌生,但是我决定在这里发布一个非常简单的代码示例,在我看来应该不会造成严重问题,但实际上它会导致内存不足错误通常在 AWS EMR Spark 环境中取决于 maxIterations 的值:

import java.net.URI

import org.apache.hadoop.fs.{FileSystem,FileUtil,Path}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.graphx._

import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem,Path}
import org.apache.hadoop.io.IoUtils
import java.io.IOException

val config = new SparkConf().setAppName("test graphx")
config.set("spark.driver.allowMultipleContexts","true")
val batch_id=new Integer(31)

val maxIterations=2 //200 interations are causing out of memory

var myVertices = sc.makeRDD(Array( (1L,("A",batch_id,0.0,11.0)),(2L,("B",1000.0,300.0)),(3L,( "C",8.0)),(4L,("D",400.0)) ))
var myEdges = sc.makeRDD(Array(Edge(4L,3L,(7.7,0.0) ),Edge(2L,(5.0,1L,(12.0,0.0))))

var myGraph=Graph(myVertices,myEdges)
    myGraph.cache
    
myGraph.triplets.foreach(println)

//we need to calculate some constant values for each edge before start of pregel
val initGraph=myGraph.mapTriplets(tr => 
    (tr.attr._1,(tr.attr._1 * 
                    (scala.math.sqrt((tr.dstAttr._3-tr.srcAttr._3)*(tr.dstAttr._3-tr.srcAttr._3)+( tr.dstAttr._4-tr.srcAttr._4)*( tr.dstAttr._4-tr.srcAttr._4)+(tr.dstAttr._5-tr.srcAttr._5)*(tr.dstAttr._5-tr.srcAttr._5))) *
                    (scala.math.sqrt((tr.dstAttr._3-tr.srcAttr._3)*(tr.dstAttr._3-tr.srcAttr._3)+( tr.dstAttr._4-tr.srcAttr._4)*( tr.dstAttr._4-tr.srcAttr._4)+(tr.dstAttr._5-tr.srcAttr._5)*(tr.dstAttr._5-tr.srcAttr._5))) /
                    (tr.dstAttr._6 * tr.dstAttr._6)) 
    )
    
)

initGraph.triplets.take(100).foreach(println)

val distanceStep = 0.1
val tolerance = 1


val sssp = initGraph.pregel( (0.0,0.0),maxIterations //500-3000
 )(
  (id: VertexId,vert: ((String,Integer,Double,Double)),msg: (Double,Double)) =>
    (
      vert._1,vert._2,( if (scala.math.abs(msg._1)> tolerance) {vert._3+distanceSteP*msg._1 } else { vert._3 }),( if (scala.math.abs(msg._2)> tolerance) {vert._4+distanceSteP*msg._2 } else { vert._4 }),( if (scala.math.abs(msg._3)> tolerance) {vert._5+distanceSteP*msg._3 } else { vert._5 }),vert._6
    ),// Vertex Program
  e => {  // Send Message
    Iterator(
      (
        e.dstId,(
          ((e.srcAttr._3 - e.dstAttr._3)*distanceSteP*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )),//x
          ((e.srcAttr._4 - e.dstAttr._4)*distanceSteP*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )),//y
          ((e.srcAttr._5 - e.dstAttr._5)*distanceSteP*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )),//z
          e.attr._1*distanceSteP*scala.math.sqrt((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) //vector module
        )
      )
    )
  },{
    (a,b) => (a._1 + b._1,a._2 + b._2,a._3 + b._3,0) // Merge Message
  }
)

sssp.vertices.take(10).foreach(println)

我通过 Zeppelin 在 4 节点 m5.x2large 集群上的 AWS EMR 中运行它,但是它可以被快速采用并作为 Spark 中的作业执行。

简而言之,这段代码创建了一个具有 4 个顶点和 3 个边的 myGraph 图。然后对于每个三元组,我计算一些常量值并为此使用图形对象 initGraph

然后对于 initGraph,我应用了 pregel API,它的执行仅受迭代次数 maxIterations 的限制。而此时对于 pregel API,我看到了奇怪的行为。对于较小的 maxIterations 值(小于 10),它工作得非常快,对于 100-150 次迭代,它在 zeppelin 中执行 3-4 分钟,而对于 200 次迭代,它会因不同的错误(ConnectionClosed 等)而失败。

在设置 maxIterations=150 或 200 后,我尝试监控集群的运行情况,看起来像这样

enter image description here

分配的内存直线上升,可用内存以相同的速度减少。

由于我对 Spark 很陌生,我不确定这是正确的行为,而且老实说,即使在如此小的图形上进行 200 次预凝胶迭代,我也无法找到可能消耗千兆字节内存的解释。如果你能在你的最后重现它并检查,我很想听听你关于性能优化的建议,因为如果我扩展集群并在更大的硬件设置上运行相同的代码,它只是一个问题 {{1 }} 和图的大小实际上得到相同的 OutOfMemory 错误。我需要运行的实际图形超过 1M 个顶点和约 7M 个边,所以我无法弄清楚如果这个问题无法解决,它可能需要什么样的硬件。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)