使用 Dask 和 Xarray 进行并行计算

问题描述

我有以下功能

@dask.delayed
def load_ds(p):
    import xarray as xr
    multi_file_dataset = xr.open_mfdataset(p,combine='by_coords',concat_dim="time",parallel=True)
    mean = multi_file_dataset['tas'].mean(dim='time')
    return mean

打开一组 NetCDF 文件(由路径 p 标识)并计算随时间变化的平均值。

我试图在两个不同的路径(= 数据集)上并行运行该函数

results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
   
results = dask.compute(*results)

我也试过

results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
  
futures = dask.persist(*results)
results = dask.compute(*futures)

但是,当我尝试检索结果时,我注意到执行实际上开始了:

 print(results[0].values)

再次,当我检索第二个

 print(results[1].values)

怎么了?有没有办法只检索一次结果对象?

解决方法

鉴于您目前所做的工作,您会怎么做:

delayed_task = dask.delayed(
    lambda L: (L[0].values,L[1].values)
)(results)

然后“稍后”

tup = delayed_task.compute()