问题描述
@dask.delayed
def load_ds(p):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(p,combine='by_coords',concat_dim="time",parallel=True)
mean = multi_file_dataset['tas'].mean(dim='time')
return mean
打开一组 NetCDF 文件(由路径 p 标识)并计算随时间变化的平均值。
我试图在两个不同的路径(= 数据集)上并行运行该函数:
results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
results = dask.compute(*results)
我也试过
results = []
result1 = dask.delayed(load_ds)(path1)
results.append(result1)
result2 = dask.delayed(load_ds)(path2)
results.append(result2)
futures = dask.persist(*results)
results = dask.compute(*futures)
但是,当我尝试检索结果时,我注意到执行实际上开始了:
print(results[0].values)
再次,当我检索第二个
print(results[1].values)
怎么了?有没有办法只检索一次结果对象?
解决方法
鉴于您目前所做的工作,您会怎么做:
delayed_task = dask.delayed(
lambda L: (L[0].values,L[1].values)
)(results)
然后“稍后”,
tup = delayed_task.compute()