问题描述
TLDR:
如何有效地使用 this.props.textinitialValue
将多个 <Formik
key={this.props.textinitialValue}
...
支持的 dask-distributed
数据集写入 AWS S3 上的 dask
存储?
详情:
我有一个工作流,它在 S3 上获取栅格数据集列表并生成一个 dask-array 支持的 xarray 数据集。
我需要迭代多个组,其中对于每个组,工作流获取属于该组的栅格数据集并生成相应的 xarray 数据集。
现在我想将数据集中的数据写入 S3 上的 zarr 存储(同一个存储,仅使用 xarray
参数)。
这是用于顺序处理的伪代码的样子:
zarr
这很有效,一旦执行 group
,数据就会加载并存储在 S3 上,任务并行运行。
现在我想使用 client = Client(...) # using a distributed cluster
zarr_store = fsspec.get_mapper("s3://bucket/key.zarr")
for group_select in groups:
xr_dataset = get_dataset_for_group(group_select)
# totally unnecessary,just to illustrate that this is a lazy dataset,nothing has been loaded yet
assert dask.is_dask_collection(xr_dataset)
xr_dataset.to_zarr(zarr_store,group=group_select)
并行运行它。这是我尝试过的以及遇到的问题:
1.使用to_zarr
收集延迟任务列表
这在原理上是有效的,但是速度很慢。创建一个任务大约需要 3-4 秒,我需要运行 100 多次,在实际开始任何计算之前需要 4-5 分钟。
这极大地加快了任务的创建,但是对 zarr 存储的写入不会在工作人员之间拆分,而是在加载任务完成后,处理任务的工作人员会收集所有数据并将其写入 zarr。
3. 将 .to_zarr(...,compute=False)
包装在自定义函数中并将其传递给 dask.delayed
这看起来是最有前途的选择。我刚刚将 to_zarr
调用包装在一个自定义函数中,该函数可以从工作人员调用:
client.submit
使用 to_zarr
执行此操作会将写入任务放回调度程序并解决我在上面使用 def dump(ds,target,group=None):
with worker_client() as client:
ds.to_zarr(store=target,group=group)
return True
遇到的问题。
然而,当我重复提交这个函数时(我需要这样做 100+ 次)沿着
worker_client
我很快就用要处理的任务压倒了调度程序。
我能想到的唯一明显的解决方案是分批拆分数据集,只有在前一个完成后才开始一个新的。但是没有更优雅的解决方案吗?或者在 dask(分布式)中是否有内置功能?