问题描述
我需要使用 dask 和 Xarray 计算两个数据集(每月重新采样的两个每日变量)之间的差异。这是我的代码:
def diff(path_1,path_2):
import xarray as xr
max_v=xr.open_mfdataset(path_1,combine='by_coords',concat_dim="time",parallel=True)['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2,parallel=True)['variable_2'].resample({'time': '1M'}).min()
return (max_v-min_v).compute()
future = client.submit(diff,path_1,path_2)
diff = client.gather(future)
我也试过这个:
%%time
def max_var(path):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(path,parallel=True)
max_v=multi_file_dataset['variable_1'].resample(time='1M').max(dim='time')
return max_v.compute()
def min_var(path):
import xarray as xr
multi_file_dataset = xr.open_mfdataset(path,parallel=True)
min_v=multi_file_dataset['variable_2'].resample(time='1M').min(dim='time')
return min_v.compute()
futures=[]
future = client.submit(max_temp,path1)
futures.append(future)
future = client.submit(min_temp,path2)
futures.append(future)
results = client.gather(futures)
diff = results[0]-results[1]
但我注意到在 getitem-nanmax e getitem-nanmin 的最后一步(例如 1980 年中的 1974 年)的计算变得非常缓慢。
这里是集群配置:
cluster = SLURMCluster(walltime='1:00:00',cores=5,memory='5GB')
cluster.scale(jobs=10)
每个数据集由几个文件组成:总大小=7GB
有没有更好的方法来实现这个计算?
谢谢
解决方法
不能 100% 确定这适用于您的情况,但没有 mwe
就很难做得更好。所以,我怀疑 .compute()
使用的 xarray
可能与 client.submit
发生冲突,因为现在计算正在工作人员身上进行,我不确定它是否可以正确分配工作同行(但这是一个怀疑,我不确定)。因此,解决此问题的一种方法是将计算放入主脚本中(因为 xarray
将与背景中的 dask
集成),所以这可能会起作用:
import xarray as xr
max_v=xr.open_mfdataset(path_1,combine='by_coords',concat_dim="time",parallel=True,chunks={'time': 10})['variable_1'].resample({'time': '1M'}).max()
min_v=xr.open_mfdataset(path_2,chunks={'time': 10})['variable_2'].resample({'time': '1M'}).min()
diff_result = (max_v-min_v).compute()
以下是不同数据集上的 mwe
:
import xarray as xr
# chunks option will create dask array
ds = xr.tutorial.open_dataset('rasm',decode_times=True,chunks={'time': 10})
# these are lazy calculations
max_v = ds['Tair'].resample({'time': '1M'}).max()
min_v = ds['Tair'].resample({'time': '1M'}).min()
# this will use dask scheduler in the background
diff_result = (max_v-min_v).compute()
# since the data refers to the same variable,all the results will be either 0 or `nan` (if the variable was not available in that time/x/y combination)