从 PySpark 和 Parquet 创建大型数组进行数据分析时如何避免内存不足错误?

问题描述

我们正在试验一个有点不寻常的要求。我们有一个包含约 320 GB 频率像素值的镶木地板表 - 数据来自 3 维谱线图像立方体(FIT 源文件)。

+--------------------+------------+-------+
|            col_name|   data_type|comment|
+--------------------+------------+-------+
|           spi_index|         int|   null|
|           spi_image|array<float>|   null|
|        spi_filename|      string|   null|
|            spi_band|         int|   null|
|# Partition Infor...|            |       |
|          # col_name|   data_type|comment|
|        spi_filename|      string|   null|
|            spi_band|         int|   null|
+--------------------+------------+-------+

一个用 C 编写的遗留源查找算法,用于分析原始数据源文件,但它受限于一台机器上可用的物理内存。我们正在运行一个 CDH Express 集群、3 个管理节点(8 核、32 GB 内存)、10 个工作节点(16 核、64 GB 内存)和一个运行 Jupyterhub(8 核、32 GB 内存)的网关节点。我们已将原始 C 程序修改为共享对象,并将其分布在整个集群中。我们已将 C 共享对象合并到一个分区程序类中,因此我们可以在整个集群的多个执行程序中运行多个分区。我们使用 pyspark 启动并运行它。

我们遇到的问题是,我们最好需要一个至少约 15GB 的输入像素阵列,而我们似乎在创建约 7.3GB 的阵列时遇到了困难,我们不确定为什么会这样.

YARN 最大分配设置。

yarn.scheduler.maximum-allocation-mb = 40GB

Spark 配置设置

--executor-memory 18g \
--num-executors 29 \
--driver-memory 18g \
--executor-cores 5 \
--conf spark.executor.memoryOverhead='22g' \
--conf spark.driver.memoryOverhead='22g' \
--conf spark.driver.maxResultSize='24g' \
--conf "spark.executor.extrajavaoptions-XX:MaxDirectMemorySize=20G -XX:+UseCompressedOops" \

partitioner类总结

class Partitioner:
def __init__(self):
    self.callPerDriverSetup
    
def callPerDriverSetup(self):
    pass

def callPerPartitionSetup(self):
    sys.path.append('sofia')
    #import example
    
    import sofia
    import myLib
    
    import faulthandler
    import time as tm
    from time import time,clock,process_time,sleep 

     
    self.sofia=sofia
    self.myLib=myLib
    #self.example=example
    self.parameterFile=SparkFiles.get('sofia.par')
    
def doProcess(self,element):
    ###  here's the call to the C library for each row of the dataframe partition
    ### In here we have to transform the flattened array data to the format SoFiA
    ### requires,as well as the
    
    ra=np.array(element.right_ascension,dtype='<f4')
    dec=np.array(element.declination,dtype='<f4')
    frequencies=np.array(element.frequency,dtype='<f4')
    Pixels=np.array(element.pixels,dtype='<f4')
    
    
    dataPtr= Pixels.ravel()
            
    #
    # create the dictionary of the original header
    #
    hdrKey = np.array(element.keys,dtype='U')
    hdrValue = np.array(element.values,dtype='U')
    hdrDict = dict(zip(hdrKey,hdrValue))
    
    newHdr=self.myLib.CreateHeader(hdrDict)
    
    # Get the crpix adjustment values for the new header
    crpix1,crpix2,crpix3,crpix4=self.myLib.GetHeaderUpdates(newHdr,\
                                      element.raHeaderIndex,\
                                      element.decHeaderIndex,\
                                      element.freqHeaderIndex)
    # Get the new axis values
    naxis1 = len(ra)
    naxis2 = len(dec)
    naxis4 = len(frequencies)
    
    newHdr['CRPIX1']=float(crpix1)
    newHdr['CRPIX2']=float(crpix2)
    newHdr['CRPIX3']=float(crpix3)
    newHdr['CRPIX4']=float(crpix4)
    
    newHdr['NAXIS1']=float(naxis1)
    newHdr['NAXIS2']=float(naxis2)
    newHdr['NAXIS4']=float(naxis4)
    
    newHdr.pop("END",None)
    
    hdrstr,hdrsize = self.myLib.dict2FITsstr(newHdr)
    
    path_to_par = self.parameterFile
    parsize = len(path_to_par)
    # pass off to sofia C library
    try:
        # This is the call to the shared object
        ret = self.sofia.sofia_mainline(dataPtr,hdrstr,hdrsize,path_to_par,parsize)
        returnCode= ret[0]
        sofiaMsg="Call to sofia has worked!"
    except RuntimeError:
        print("Caught general exception\n")
        sofiaMsg="Caught general exception"
        returnCode=1
        #sys.exit()
    except stopiteration:
        print("Caught Null pointer\n")
        sofiaMsg="Caught Null pointer"
        returnCode=2
        #sys.exit()
    except MemoryError:
        print("Caught ALLOC error\n")
        sofiaMsg="Caught ALLOC error"
        returnCode=3
        #sys.exit()
    except IndexError:
        print("Caught index range error\n")
        sofiaMsg="Caught index range error"
        returnCode=4
        #ys.exit()
    except IOError:
        print("Caught file error\n")
        sofiaMsg="Caught file error"
        returnCode=5
        #sys.exit()
    except OverflowError:
        print("Caught integer overflow error\n")
        sofiaMsg="Caught integer overflow error"
        returnCode=6
        #sys.exit()
    except TypeError:
        print("Caught user input error\n")
        sofiaMsg="Caught user input error"
        returnCode=7
        #sys.exit()
    except SystemExit:
        print("Caught no sources error\n")
        sofiaMsg="Caught no sources error"
        returnCode=8
        #sys.exit()


    if returnCode==0:
        catalogXML=ret[5].tobytes()
        pass
    else:
        catalogXML=""

    DELIMITER=chr(255)                      #"{}{}"
    msg="{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}{}"\
    .format( str(returnCode),DELIMITER,\
            str(element.raHeaderIndex),\
            str(element.decHeaderIndex),\
            str(len(ra)),\
            str(len(dec)),\
            str(len(frequencies)),\
            str(element.freqHeaderIndex),\
            str(catalogXML),\
            str(dataPtr.nbytes)
           )
    
    return msg

def processpartition(self,partition):
    self.callPerPartitionSetup()
    for element in partition:
        yield self.doProcess(element)

我们创建一个数据帧,其中数据帧的每一行代表一个 3 维数据数组,包含 3 个位置维度的数组数据列,以及包含像素值的数组(在 doProcess 中调用 ra、dec、频率和像素)。数据帧行还包含来自原始源文件的坐标系信息,我们用它来构建一组新的标头信息,该信息通过 df.rdd.mapPartitions 调用传递给 Partitioner 类的实例化实例。

p=Partitioner()
try:           
    ...
    ...
    ...
    ...
    # Creates the positional array dataframes,# and the single rows representing the 3d images in finalSubCubedF
    # finalSubCube
    #
    ...
    ...
    ...                  
    finalSubcubedF=finalSubcubedF\
    .join(broadcast(rarangeDF),finalSubcubedF.bins == rarangeDF.grp,"inner")\
    .join(broadcast(decRangeDF),finalSubcubedF.bins == decRangeDF.grp,"inner")\
    .join(broadcast(freqRangeDF),finalSubcubedF.bins == freqRangeDF.grp,"inner")\
    .join(broadcast(hdrDF),finalSubcubedF.bins == hdrDF.grp,"inner")\
    .select("bins","right_ascension","declination","frequency","raHeaderIndex",\
            "decHeaderIndex","freqHeaderIndex","pixels","keys","values")

    # repartition on the bins column to distribute the processing 
    finalSubcubedF=finalSubcubedF.repartition("bins")
    finalSubcubedF.persist(StorageLevel.MEMORY_AND_disK_SER)
    
    ...
    # Calls the partitioner class which containes the C shared object calls,as above
    ...
    
    rddout=finalSubcubedF.rdd.mapPartitions(p.processpartition)
    DELIMITER=chr(255)
    rddout= rddout.map(lambda x:x.split(DELIMITER))
    ...
    ...
    # Write the results (which activates the computation) including the catalogue XML file
                
    rddout.saveAsTextFile("hdfs:///<file path>")
except Exception as e:
    ...
    ...
    ...     

调用过程中捕获的错误信息

2021-02-16 11:03:42,830 INFO ERROR! Processthread FAILURE...An error occurred while calling o1989.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 47.1 Failed 4 times,most recent failure: Lost task 5.3 in stage 47.1 (TID 5875,hercules-1-2.nimbus.pawsey.org.au,executor 12): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Container marked as Failed: container_1612679558527_0181_01_000014 on host: hercules-1-2.nimbus.pawsey.org.au. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal
2021-02-16 11:03:42,executor 12): ExecutorLostFailure (executor 12 exited caused by one of the running tasks) Reason: Container marked as Failed: container_1612679558527_0181_01_000014 on host: hercules-1-2.nimbus.pawsey.org.au. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal

以及来自 YARN Container 日志的错误消息

LogType:stdout
Log Upload Time:Tue Feb 16 11:19:03 +0000 2021
LogLength:124
Log Contents:
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 20417"...

所以问题显然与内存有关,但不完全确定为什么会出现这种情况,因为执行程序和驱动程序进程内存设置设置得相当高?目前我们只是抓住了稻草。

我们知道在正常情况下不建议将分布式数据收集到数组中;然而,似乎能够在集群中并行运行多个 C 共享对象可能比在单台机器上串行运行 30-40 GB 的提取更有效。

预先感谢您的任何想法和帮助。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)