从 Parquet 加载数据帧并计算 RAM 中的最大爆炸

问题描述

我是 dask 的新手,并使用行组将 Pandas Dataframe 导出到 Parquet:

x.to_parquet(path + 'ohlcv_TRX-PERP_978627_rowgrouped.prq',row_group_size=1000)

然后我尝试用 dask 加载它,它似乎工作正常(?):

x = dd.read_parquet(path + 'ohlcv_TRX-PERP_978627_rowgrouped.prq')
x

# Note: The dataframe has almost 2000 columns,I clipped them for here
dask DataFrame Structure:
                        open     h
npartitions=978                   
2019-07-21 23:55:00  float64  floa
2019-07-22 16:35:00      ...      
                      ...      ...
2021-05-30 17:06:00      ...      
2021-05-31 03:32:00      ...      
dask Name: read-parquet,978 tasks

到目前为止,没有问题。但是当我对其调用 x.max().compute() 时,dask 似乎将整个数据集加载到 RAM 中(至少 RAM 像疯了一样上升)然后崩溃。只看max()

x = x.max()
x

dask Series Structure:
npartitions=1
ACCBL_10    float64
volume          ...
dtype: float64
dask Name: dataframe-max-agg,1957 tasks

根据 dask 教程 https://tutorial.dask.org/04_dataframe.html#Computations-with-dask.dataframe 我的理解,这应该可以正常工作(?)

当我尝试仅在一列上调用 max() 时,它也会耗尽内存:

x.open.max().compute()

是我做错了什么,还是它应该是这样工作的,我必须做一些不同的事情?

我现在还尝试使用 distributed 系统并将客户端限制为 10GB,但 dask 再次占用了 24GB 的 RAM,并且只是打印了一条警告,指出工作组完全超出了设置的内存限制:

if __name__ == '__main__':

    client = Client(processes=False,memory_limit='5GB')

    x = dd.read_parquet(path + 'ohlcv_TRX-PERP_978627_rowgrouped.prq')
    print(x)
    s = x.max().compute()
    print(s)


distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 24.13 GB -- Worker memory limit: 5.00 GB

解决方法

如果可能,我会将镶木地板保存到多个文件中(大小取决于您的硬件,但在笔记本电脑上,每个分区大约 100-200 MB 会很好)。如果这不是一个选项,请尝试以下操作:

x.open.max(split_every=2).compute()

这样做是要求 dask 计算每个分区的最大值,然后比较每 2 个分区的最大值,这以运行更多任务为代价减少了内存占用。您可以尝试使用 split_every 数字,看看您的硬件是否可以容忍更高的值,但希望 2 会起作用。

此外,如果您打算处理单个文件,使用 vaex 可能会获得更好的性能,请参阅 this comparison

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...