同时将 xarray 数据集写入 zarr - 如何通过分布式 dask 有效扩展

问题描述

TLDR:

如何有效地使用 this.props.textInitialValue 将多个 <Formik key={this.props.textInitialValue} ... 支持的 dask-distributed 数据集写入 AWS S3 上的 dask 存储?

详情

我有一个工作流,它在 S3 上获取栅格数据集列表并生成一个 dask-array 支持的 xarray 数据集。

我需要迭代多个组,其中对于每个组,工作流获取属于该组的栅格数据集并生成相应的 xarray 数据集。

现在我想将数据集中的数据写入 S3 上的 zarr 存储(同一个存储,仅使用 xarray 参数)。

这是用于顺序处理的伪代码的样子:

zarr

这很有效,一旦执行 group,数据就会加载并存储在 S3 上,任务并行运行。


现在我想使用 client = Client(...) # using a distributed cluster zarr_store = fsspec.get_mapper("s3://bucket/key.zarr") for group_select in groups: xr_dataset = get_dataset_for_group(group_select) # totally unnecessary,just to illustrate that this is a lazy dataset,nothing has been loaded yet assert dask.is_dask_collection(xr_dataset) xr_dataset.to_zarr(zarr_store,group=group_select) 并行运行它。这是我尝试过的以及遇到的问题:

1.使用to_zarr收集延迟任务列表

这在原理上是有效的,但是速度很慢。创建一个任务大约需要 3-4 秒,我需要运行 100 多次,在实际开始任何计算之前需要 4-5 分钟。

2. 将其包装成 dask.distribuited

这极大地加快了任务的创建,但是对 zarr 存储的写入不会在工作人员之间拆分,而是在加载任务完成后,处理任务的工作人员会收集所有数据并将其写入 zarr。

3..to_zarr(...,compute=False) 包装在自定义函数中并将其传递给 dask.delayed

这看起来是最有前途的选择。我刚刚将 to_zarr 调用包装在一个自定义函数中,该函数可以从工作人员调用:

client.submit

使用 to_zarr 执行此操作会将写入任务放回调度程序并解决我在上面使用 def dump(ds,target,group=None): with worker_client() as client: ds.to_zarr(store=target,group=group) return True 遇到的问题。

然而,当我重复提交这个函数时(我需要这样做 100+ 次)沿着

worker_client

我很快就用要处理的任务压倒了调度程序。

我能想到的唯一明显的解决方案是分批拆分数据集,只有在前一个完成后才开始一个新的。但是没有更优雅的解决方案吗?或者在 dask(分布式)中是否有内置功能?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)