达斯克杀害工人,同时读取并保存大的.csv文件

问题描述

我大约有1.5 TB的数据分为大约5500个json文件,我需要使用map_partition处理(NN搜索)并保存结果。 (GCS)。 每个.json文件的大小在100-400 MB之间。 另外,我使用一个集群,每个集群有250个cpu,每个处理器有16个内核,每个内存有60 GB。因此,整体内存限制约为14 TB。 但是,其中一名工人总是死于某种我无法理解的原因。

distributed.scheduler.KilledWorker: ("('bag-from-delayed-loads-to_dataframe-aa638adfe0c457d6aa064850d188edc1',1645)",<Worker 'tcp://10.0.8.221:42137',name: tcp://10.0.8.221:42137,memory: 0,processing: 15>)

即使我运行下面的朴素代码块来简单地加载json数据,将其转换为dataframe并将其保存为csv,我也会遇到相同的问题。 但是,对于较小的数据集,效果很好。

data = db.read_text("gs://path-to-input-files/*").map(json.loads)
# data = data.repartition(npartitions=10000)
data = data.to_dataframe(Meta={"a": object,"b": np.int32,"c": object})
data.to_csv("gs://path-to-output-files/*.csv,index=False)
Traceback (most recent call last):
  File "run_search.py",line 69,in <module>
    nn_search.run(client)
  File "/github_consumer-edge_mid-match-inference/mid_match/k_nn_search.py",line 92,in run
    data.to_csv(
  File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py",line 1408,in to_csv
    return to_csv(self,filename,**kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/csv.py",line 892,in to_csv
    delayed(values).compute(**compute_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/base.py",line 166,in compute
    (result,) = compute(self,traverse=False,**kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/base.py",line 444,in compute
    results = schedule(dsk,keys,**kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 2682,in get
    results = self.gather(packed,asynchronous=asynchronous,direct=direct)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 1976,in gather
    return self.sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 831,in sync
    return sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py",line 339,in sync
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py",line 323,in f
    result[0] = yield future
  File "/opt/conda/lib/python3.8/site-packages/tornado/gen.py",line 735,in run
    value = future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 1841,in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('bag-from-delayed-loads-to_dataframe-aa638adfe0c457d6aa064850d188edc1',processing: 15>)

这是仪表板的视频。 https://drive.google.com/file/d/1Ywbn2lLEBkRi5a0iD7ZJ1Hx2GNUKfgjG/view?usp=sharing

随着时间的推移,存储的字节数(从仪表板-左上方的直方图)一直在增加,这就是为什么我认为其中一个工作程序内存不足的原因。
从仪表板上看,读取json并将其转换为dask数据帧似乎比保存.csv并上传到gcs快得多。

  1. 是read_text()读取数据的速度快于工作人员释放内存的速度吗?
  2. 我是否应该等到对特定分区执行data.to_csv()之后,工人才能读取更多数据?如果是这样,那我应该怎么做呢?诸如dask.distributed.wait之类的东西?
  3. 我尝试将数据帧重新划分为更多的分区,例如10K甚至50K(假设重新分区将导致较小的数据大小),但仍然无济于事。
  4. 在这里想念什么?有人可以帮我指出一些对我有帮助的文章文件吗?

PS:我是dask和分布式计算的新手。

引用:What do KilledWorker exceptions mean in Dask?

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...