问题描述
我正在尝试使用 len(dataframe[column])
查找 dask 数据帧的长度,但每次尝试执行此操作时都会出现错误:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
Traceback (most recent call last):
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\queues.py",line 238,in _Feed
send_bytes(obj)
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py",line 200,in send_bytes
self._send_bytes(m[offset:offset + size])
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py",line 280,in _send_bytes
ov,err = _winapi.WriteFile(self._handle,buf,overlapped=True)
brokenPipeError: [WinError 232] The pipe is being closed
distributed.nanny - ERROR - Nanny Failed to start process
Traceback (most recent call last):
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\nanny.py",line 575,in start
await self.process.start()
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\process.py",line 34,in _call_and_set_future
res = func(*args,**kwargs)
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\site-packages\distributed\process.py",line 202,in _start
process.start()
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py",line 112,in start
self._popen = self._Popen(self)
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\context.py",line 223,in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\context.py",line 322,in _Popen
return Popen(process_obj)
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\popen_spawn_win32.py",line 89,in __init__
reduction.dump(process_obj,to_child)
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\reduction.py",line 60,in dump
ForkingPickler(file,protocol).dump(obj)
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py",line 948,in reduce_pipe_connection
dh = reduction.DupHandle(conn.fileno(),access)
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py",line 170,in fileno
self._check_closed()
File "C:\Users\thakneh\AppData\Local\Continuum\anaconda3\lib\multiprocessing\connection.py",line 136,in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
我的 dask 数据框有 1000 万行。有什么办法可以解决这个错误。
解决方法
我觉得找到列的长度不会那么简单,因为 Dask 可能正在从各种来源构建数据框 - 类似于为什么您可以在数据框上获取 .head()
,但需要做一些事情额外做.tail()
。
由于您使用了如此大的数据框,我相信 Python 会将 len()
中的任何内容加载到内存中。
我有两个建议,但我不确定它们不会触发相同的异常。
使用 pipe
让我们看看这是否可行,您可以尝试在您的列上使用 pipe
并将 len
传递给它,也许可以。
dataframe["column"].pipe(len)
参考这里是 pipe documentation
分区
我认为它可以帮助的一件事是,如果您将列分区为块,这可能有助于保持较低的内存使用率,唯一的问题是您必须对这些分区的大小做一些来宾工作会。
您必须跟踪的另一件事是每个分区的长度,这可能有点混乱,我觉得必须有更好的方法来做到这一点。
length = 0
len += dataframe["column"].partitions[:10000]
len += dataframe["column"].partitions[:20000]
当然,您可以尝试使用循环使代码更简洁一些。
参考这里是 dataframe.partitions
请让我知道这些是否有效,我希望我能帮助你。