一、Spark调优概述 --------------------------------------------- 1.Spark调优分类 a.常规性能调优:分配资源、并行度。。。等 b.JVM调优(Java虚拟机):JVM相关的参数,通常情况下,如果你的硬件配置、基础的JVM的配置,都ok的话,JVM通常不会造成太严重的性能问题; 反而更多的是,在troubleshooting中,JVM占了很重要的地位;JVM造成线上的spark作业的运行报错,甚至失败(比如OOM)。 c.shuffle调优(相当重要):spark在执行groupByKey、reduceByKey等操作时的,shuffle环节的调优。这个很重要。 shuffle调优,其实对spark作业的性能的影响,是相当之高!!!经验:在spark作业的运行过程中,只要一牵扯到有shuffle的操作, 基本上shuffle操作的性能消耗,要占到整个spark作业的50%~90%。10%用来运行map等操作,90%耗费在两个shuffle操作。 groupByKey、countByKey。 d.spark算子调优(spark算子调优,比较重要):groupByKey,countByKey或aggregateByKey来重构实现。 有些算子的性能,是比其他一些算子的性能要高的。foreachPartition替代foreach。如果一旦遇到合适的情况,效果还是不错的。 2.主要手段 a.分配资源、并行度、RDD重构与持久化 b.shuffle调优 c.spark算子调优 d.JVM调优、广播大变量。。。 二、JVM内存不足的影响 ------------------------------------------- 内存不充足的时候,问题: 1.频繁minor gc(年轻代垃圾回收计机制),也会导致频繁spark停止工作 2.老年代囤积大量活跃对象(短生命周期的对象),导致频繁full gc,full gc时间很长,短则数十秒,长则数分钟,甚至数小时。 可能导致spark长时间停止工作。 3、严重影响spark的性能和运行的速度。 三、JVM调优的第一个点:降低cache操作的内存占比 ---------------------------------------------- spark中,堆内存又被划分成了两块儿,一块儿是专门用来给RDD的cache、persist操作进行RDD数据缓存用的; 另外一块儿,就是我们刚才所说的,用来给spark算子函数的运行使用的,存放函数中自己创建的对象。 默认情况下,给RDD cache操作的内存占比,是0.6,60%的内存都给了cache操作了。 但是问题是,如果某些情况下,cache不是那么的紧张,问题在于task算子函数中创建的对象过多,然后内存又不太大, 导致了频繁的minor gc,甚至频繁full gc,导致spark频繁的停止工作。性能影响会很大。 针对上述这种情况,大家可以在之前我们讲过的那个spark ui。yarn去运行的话,那么就通过yarn的界面, 去查看你的spark作业的运行统计,很简单,大家一层一层点击进去就好。 可以看到每个stage的运行情况,包括每个task的运行时间、gc时间等等。如果发现gc太频繁,时间太长。 此时就可以适当调节这个比例。 降低cache操作的内存占比,大不了用persist操作,选择将一部分缓存的RDD数据写入磁盘,或者序列化方式, 配合Kryo序列化类,减少RDD缓存的内存占用;降低cache操作内存占比; 对应的,算子函数的内存占比就提升了。这个时候,可能,就可以减少minor gc的频率,同时减少full gc的频率。 对性能的提升是有一定的帮助的。 一句话,让task执行算子函数时,有更多的内存可以使用。 四、如何降低cache操作的内存占比? ---------------------------------------------- newSparkConf().set("spark.storage.memoryFraction",0.6(默认)) ---> 0.5 -> 0.4 -> 0.2 五、JVM调优第二点:调节executor堆外内存与连接等待时长 --------------------------------------------- 1.什么情况下去调节executor堆外内存? 有时候,如果你的spark作业处理的数据量特别特别大,几亿数据量;然后spark作业一运行, 时不时的报错,shuffle file cannot find,executor、task lost,out of memory(内存溢出) 出现这种情况,可能就是对外内存不太够用,导致executor在运行的过程中,可能会内存溢出; 然后导致后续的stage的task在运行的时候,可能要从一些executor中去拉取shuffle map output文件, 但是executor可能已经挂掉了,关联的block manager也没有了;所以可能会报shuffle output file not found;resubmitting task;executor lost;spark作业彻底崩溃。 一旦出现上述情况下,可以去考虑调节一下executor的堆外内存。也许就可以避免报错; 此外,有时,堆外内存调节的比较大的时候,对于性能来说,也会带来一定的提升。 2.如何调节executor堆外内存? spark-submit --conf spark.yarn.executor.memoryOverhead=2048 注意!!!切记,不是在你的spark作业代码中,用new SparkConf().set()这种方式去设置,不要这样去设置, 是没有用的!一定要在spark-submit脚本中去设置。 默认情况下,这个堆外内存上限大概是300多M;后来我们通常项目中,真正处理大数据的时候,这里都会出现问题, 导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G 通常这个参数调节上去以后,就会避免掉某些JVM OOM的异常问题,同时呢,会让整体spark作业的性能, 得到较大的提升。 3.为什么要调节连接等待时长? task创建的对象特别大,特别多,就会导致JVM内存满溢,从而频繁的垃圾回收。但是一旦垃圾回收,所有的spark工作 线程就会停止工作,无法提供相应。 假设,一个task需要map数据,但是发现自己的excutor对应的blockmanager上没有,那么就会去其他的节点上去拉取 map数据,通过网络传输方式。恰巧的是,其他节点正好处于垃圾回收阶段,spark无法进行网络响应。导致网络连接失败。 而spark默认的等待时长是60s,当超过60s,连接超时,就宣告失败了。如此几次之后,发现都拉取不到数据,整个spark 就会崩溃 一般会伴随出现,某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。的异常 这个时候可以考虑调节连接超时时长 4.怎么调节连接等待时长? spark-submit spark-submit脚本,切记,不是在new SparkConf().set()这种方式来设置的。 spark.core.connection.ack.wait.timeout(spark core,connection,连接,ack,wait timeout,建立不上连接的时候,超时等待时长) 调节这个值比较大以后,通常来说,可以避免部分的偶尔出现的某某文件拉取失败,某某文件lost掉了。。。