问题描述
我大约有1.5 TB的数据分为大约5500个json文件,我需要使用map_partition处理(NN搜索)并保存结果。 (GCS)。 每个.json文件的大小在100-400 MB之间。 另外,我使用一个集群,每个集群有250个cpu,每个处理器有16个内核,每个内存有60 GB。因此,整体内存限制约为14 TB。 但是,其中一名工人总是死于某种我无法理解的原因。
distributed.scheduler.KilledWorker: ("('bag-from-delayed-loads-to_dataframe-aa638adfe0c457d6aa064850d188edc1',1645)",<Worker 'tcp://10.0.8.221:42137',name: tcp://10.0.8.221:42137,memory: 0,processing: 15>)
即使我运行下面的朴素代码块来简单地加载json数据,将其转换为dataframe并将其保存为csv,我也会遇到相同的问题。 但是,对于较小的数据集,效果很好。
data = db.read_text("gs://path-to-input-files/*").map(json.loads)
# data = data.repartition(npartitions=10000)
data = data.to_dataframe(Meta={"a": object,"b": np.int32,"c": object})
data.to_csv("gs://path-to-output-files/*.csv,index=False)
Traceback (most recent call last):
File "run_search.py",line 69,in <module>
nn_search.run(client)
File "/github_consumer-edge_mid-match-inference/mid_match/k_nn_search.py",line 92,in run
data.to_csv(
File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py",line 1408,in to_csv
return to_csv(self,filename,**kwargs)
File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/csv.py",line 892,in to_csv
delayed(values).compute(**compute_kwargs)
File "/opt/conda/lib/python3.8/site-packages/dask/base.py",line 166,in compute
(result,) = compute(self,traverse=False,**kwargs)
File "/opt/conda/lib/python3.8/site-packages/dask/base.py",line 444,in compute
results = schedule(dsk,keys,**kwargs)
File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 2682,in get
results = self.gather(packed,asynchronous=asynchronous,direct=direct)
File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 1976,in gather
return self.sync(
File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 831,in sync
return sync(
File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py",line 339,in sync
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py",line 323,in f
result[0] = yield future
File "/opt/conda/lib/python3.8/site-packages/tornado/gen.py",line 735,in run
value = future.result()
File "/opt/conda/lib/python3.8/site-packages/distributed/client.py",line 1841,in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('bag-from-delayed-loads-to_dataframe-aa638adfe0c457d6aa064850d188edc1',processing: 15>)
这是仪表板的视频。 https://drive.google.com/file/d/1Ywbn2lLEBkRi5a0iD7ZJ1Hx2GNUKfgjG/view?usp=sharing
随着时间的推移,存储的字节数(从仪表板-左上方的直方图)一直在增加,这就是为什么我认为其中一个工作程序内存不足的原因。
从仪表板上看,读取json并将其转换为dask数据帧似乎比保存.csv并上传到gcs快得多。
- 是read_text()读取数据的速度快于工作人员释放内存的速度吗?
- 我是否应该等到对特定分区执行data.to_csv()之后,工人才能读取更多数据?如果是这样,那我应该怎么做呢?诸如dask.distributed.wait之类的东西?
- 我尝试将数据帧重新划分为更多的分区,例如10K甚至50K(假设重新分区将导致较小的数据大小),但仍然无济于事。
- 我在这里想念什么?有人可以帮我指出一些对我有帮助的文章或文件吗?
PS:我是dask和分布式计算的新手。
引用:What do KilledWorker exceptions mean in Dask?
谢谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)