同时将 xarray 数据集写入 zarr - 如何通过分布式 dask 有效扩展

问题描述

TLDR:

如何有效地使用 this.props.textinitialValue 将多个 <Formik key={this.props.textinitialValue} ... 支持dask-distributed 数据集写入 AWS S3 上的 dask 存储?

详情

我有一个工作流,它在 S3 上获取栅格数据集列表并生成一个 dask-array 支持的 xarray 数据集。

我需要迭代多个组,其中对于每个组,工作流获取属于该组的栅格数据集并生成相应的 xarray 数据集。

现在我想将数据集中的数据写入 S3 上的 zarr 存储(同一个存储,仅使用 xarray 参数)。

这是用于顺序处理的伪代码的样子:

zarr

这很有效,一旦执行 group,数据就会加载并存储在 S3 上,任务并行运行。


现在我想使用 client = Client(...) # using a distributed cluster zarr_store = fsspec.get_mapper("s3://bucket/key.zarr") for group_select in groups: xr_dataset = get_dataset_for_group(group_select) # totally unnecessary,just to illustrate that this is a lazy dataset,nothing has been loaded yet assert dask.is_dask_collection(xr_dataset) xr_dataset.to_zarr(zarr_store,group=group_select) 并行运行它。这是我尝试过的以及遇到的问题:

1.使用to_zarr收集延迟任务列表

这在原理上是有效的,但是速度很慢。创建一个任务大约需要 3-4 秒,我需要运行 100 多次,在实际开始任何计算之前需要 4-5 分钟。

2. 将其包装成 dask.distribuited

这极大地加快了任务的创建,但是对 zarr 存储的写入不会在工作人员之间拆分,而是在加载任务完成后,处理任务的工作人员会收集所有数据并将其写入 zarr。

3..to_zarr(...,compute=False) 包装在自定义函数中并将其传递给 dask.delayed

这看起来是最有前途的选择。我刚刚将 to_zarr 调用包装在一个自定义函数中,该函数可以从工作人员调用

client.submit

使用 to_zarr 执行此操作会将写入任务放回调度程序并解决我在上面使用 def dump(ds,target,group=None): with worker_client() as client: ds.to_zarr(store=target,group=group) return True 遇到的问题。

然而,当我重复提交这个函数时(我需要这样做 100+ 次)沿着

worker_client

我很快就用要处理的任务压倒了调度程序。

我能想到的唯一明显的解决方案是分批拆分数据集,只有在前一个完成后才开始一个新的。但是没有更优雅的解决方案吗?或者在 dask(分布式)中是否有内置功能

解决方法

在我的经验/环境中,太多任务(以及太多需要协调的工作人员)很容易让调度程序不堪重负,因此将事情分成批次通常是可行的。

要创建一个移动的工作队列,您可以使用 as_completed,每次完成另一个任务时提交/添加任务。请参阅以下相关答案:12

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...