如果从多个MapR位置读取数据,火花洗牌如何工作?

问题描述

我在MapR集群中有2个位置,而我的Spark作业正在从这2个端点加载数据。 端点之一拥有大量数据,而其他端点则相对较少。 现在,当执行.reduceByKey.groupByKey之类的随机操作时,出现OOM异常:

"java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.spark.broadcast.Torrentbroadcast$$anonfun$3.apply(Torrentbroadcast.scala:286)
    at org.apache.spark.broadcast.Torrentbroadcast$$anonfun$3.apply(Torrentbroadcast.scala:286)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
    at com.esotericsoftware.kryo.io.Output.close(Output.java:196)
    at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:255)
    at org.apache.spark.broadcast.Torrentbroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(Torrentbroadcast.scala:293)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1368)
    at org.apache.spark.broadcast.Torrentbroadcast$.blockifyObject(Torrentbroadcast.scala:292)
    at org.apache.spark.broadcast.Torrentbroadcast.writeBlocks(Torrentbroadcast.scala:127)
    at org.apache.spark.broadcast.Torrentbroadcast.<init>(Torrentbroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentbroadcastFactory.newbroadcast(TorrentbroadcastFactory.scala:34)
    at org.apache.spark.broadcast.broadcastManager.newbroadcast(broadcastManager.scala:62)
    at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:810)
    at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
    at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
"

现在,如果我将数据从1个位置复制到另一个位置,然后执行随机操作,则不会出现任何超时异常。 为什么会有这种火花现象?据我了解,随机操作将在rdd上进行,因此无论它从n个位置读取数据,其行为都应相似。

如果我的理解是错误的,请纠正我。

解决方法

问题的某些方面令人困惑。

首先,您指的是在MapR集群的“ 2个位置”中包含数据。您是指两个目录中的数据吗?还是真的是两个位置?还是说您实际上有两个集群?

另一个令人困惑的问题是,您显示了内存不足问题的堆栈跟踪,但是随后您谈到某种超时。您实际上有什么问题?

通常,内存不足异常对数据源的依赖性不大,但与资源的过度使用有很大关系。如果您有很多任务使用的内存或CPU数量比可能少得多,但有些任务使用的内存更多,那么过量使用可能会很有用。过度分配资源可使您安排此类程序,但如果在同一台计算机上同时执行多个大任务,则可能导致严重的问题。这也可能导致某种随机行为,因此,如果您不小心验证正在发生的事情的理论,就很容易得出错误的结论。

在YARN下,通常取决于过度使用。如果您在Kubernetes上运行,则更常见。

不幸的是,如果在问题上没有更多细节/一致性,那么这种概括性就是可以提供的最好的。