问题描述
我正在将数据从netcdf传输并重新分配给zarr。该过程很慢,并且没有使用很多cpu。我尝试了几种不同的配置,有时似乎做得更好,但是效果不佳。有没有人有任何技巧可以使运行效率更高?
最后一次尝试(以及先前尝试的一些,也许是全部)(在单机,分布式调度程序和使用线程的情况下),日志给出了以下消息:
distributed.core-信息-事件循环在Worker中无响应10.05s。这通常是由于长时间运行的GIL保持功能或移动大数据块引起的。
以前我在内存耗尽方面存在错误,因此我使用下面的“ stepwise_to_zarr”函数来逐段编写zarr:
def stepwise_to_zarr(dataset,step_dim,step_size,chunks,out_loc,group):
start = dataset[step_dim].min()
end = dataset[step_dim].max()
iis = np.arange(start,end,step_size)
if end > iis[-1]:
iis = np.append(iis,end)
lon=dataset.get_index(step_dim)
first = True
failures = []
for i in range(1,len(iis)):
lower,upper = (iis[i-1],iis[i])
if upper >= end:
lon_list= [l for l in lon if lower <= l <= upper]
else:
lon_list= [l for l in lon if lower <= l < upper]
sub = dataset.sel(longitude=lon_list)
rechunked_sub = sub.chunk(chunks)
write_sync=zarr.ThreadSynchronizer()
if first:
rechunked_sub.to_zarr(out_loc,group=group,consolidated=True,synchronizer=write_sync,mode="w")
first = False
else:
rechunked_sub.to_zarr(out_loc,append_dim=step_dim)
chunks = {'time':8760,'latitude':21,'longitude':20}
ds = xr.open_mfdataset("path to data",parallel=True,combine="by_coords")
stepwise_to_zarr(ds,step_size=10,step_dim="longitude",chunks=chunks,out_loc="path to output",group="group name")
在上面的图中,利用率从〜6%下降至〜0.5%似乎与完成的第一个10度纬度“批次”相符。
背景信息:
- 我正在使用一个32个vcpu和256 GB内存的GCE实例。
- 数据约为600 GB,分布在约150个netcdf文件中。
- 数据在GCS中,我正在使用Cloud Storage FUSE读取和写入数据。
- 我正在将数据从块大小:{'time':1,'latitude':521,'longitude':1440}重分配为chunksizes:{'time':8760,'latitude':21,'longitude' :20}
我尝试过:
- 使用默认的多处理调度程序
- 对进程= True和进程= False的单个计算机(https://docs.dask.org/en/latest/setup/single-distributed.html)使用分布式调度程序。
- 分布式调度程序和默认的多处理调度程序,同时还设置环境变量以避免过度订阅线程,例如:
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
解决方法
最后,我通过写入带有以下块的中间Zarr存储解决了我的问题:{'time':8760,'latitude':260,'longitude':360}。即使CPU的资源仅在工作的一小部分中得到了充分利用,这种处理仍然非常迅速。然后,使用问题中描述的逐步过程的修改版,阅读此中间zarr并将其存储在最终分块中。尽管性能不理想,但性能令人满意。
CPU utilization when writing to intermediate store
CPU utilization when writing from intermediate to final store
代码如下:
def stepwise_to_zarr(dataset,step_dim,step_size,encoding,out_loc,group,include_end=True):
start = dataset[step_dim].min()
end = dataset[step_dim].max()
iis = np.arange(start,end,step_size)
if end > iis[-1]:
iis = np.append(iis,end)
lon=dataset.get_index(step_dim)
first = True
failures = []
for i in range(1,len(iis)):
lower,upper = (iis[i-1],iis[i])
if upper >= end and include_end:
lon_list= [l for l in lon if lower <= l <= upper]
else:
lon_list= [l for l in lon if lower <= l < upper]
sub = dataset.sel(longitude=lon_list)
write_sync=zarr.ThreadSynchronizer()
if first:
sub_write=sub.to_zarr(output_loc,group=varname,consolidated=True,synchronizer=write_sync,encoding=encoding,mode="w",compute=False)
first = False
else:
sub_write=sub.to_zarr(output_loc,append_dim=step_dim,compute=False)
sub_write.compute(retries=2)
z = xr.open_zarr(input_loc,group=groupname)
new_chunks = {'time':8760,'latitude':21,'longitude':20}
z_rechunked=z.chunk(new_chunks)
#Workaround to avoid:NotImplementedError: Specified zarr chunks (8760,260,360) would #overlap multiple dask chunks
#See https://github.com/pydata/xarray/issues/2300
encoding = {}
for v in ['var1','var2','var3']:
encoding.update({v:z[v].encoding.copy()})
encoding[v]["chunks"]=(96408,21,20)
stepwise_to_zarr(z_rechunked,"longitude",60,output_loc,group=groupname)
请注意,我必须重写编码才能重新编组。
此过程有效,但有点麻烦。我之所以这样做,是因为我没有听说过分块。下次我重新打包时,我将尝试重新打包以解决该问题。