将netcdfs转换为zarr并重新分块时,CPU使用率较低

问题描述

我正在将数据从netcdf传输并重新分配给zarr。该过程很慢,并且没有使用很多cpu。我尝试了几种不同的配置,有时似乎做得更好,但是效果不佳。有没有人有任何技巧可以使运行效率更高?

最后一次尝试(以及先前尝试的一些,也许是全部)(在单机,分布式调度程序和使用线程的情况下),日志给出了以下消息:

distributed.core-信息-事件循环在Worker中无响应10.05s。这通常是由于长时间运行的GIL保持功能或移动大数据块引起的。

CPU utilization. It starts low,and then drops even lower.

以前我在内存耗尽方面存在错误,因此我使用下面的“ stepwise_to_zarr”函数来逐段编写zarr:

def stepwise_to_zarr(dataset,step_dim,step_size,chunks,out_loc,group):
    start = dataset[step_dim].min()
    end = dataset[step_dim].max()
    iis = np.arange(start,end,step_size)
    if end > iis[-1]:
          iis = np.append(iis,end)
            
    lon=dataset.get_index(step_dim)
    
    first = True
    failures = []
    for i in range(1,len(iis)):
        lower,upper = (iis[i-1],iis[i])
        if upper >= end:
            lon_list= [l for l in lon if lower <= l <= upper]

        else:
            lon_list= [l for l in lon if lower <= l < upper]
        sub = dataset.sel(longitude=lon_list)

        rechunked_sub = sub.chunk(chunks)
        write_sync=zarr.ThreadSynchronizer()

        if first:
            rechunked_sub.to_zarr(out_loc,group=group,consolidated=True,synchronizer=write_sync,mode="w")
            first = False
        else:
            rechunked_sub.to_zarr(out_loc,append_dim=step_dim)


chunks = {'time':8760,'latitude':21,'longitude':20}
ds = xr.open_mfdataset("path to data",parallel=True,combine="by_coords")
stepwise_to_zarr(ds,step_size=10,step_dim="longitude",chunks=chunks,out_loc="path to output",group="group name")

在上面的图中,利用率从〜6%下降至〜0.5%似乎与完成的第一个10度纬度“批次”相符。

背景信息:

  • 我正在使用一个32个vcpu和256 GB内存的GCE实例。
  • 数据约为600 GB,分布在约150个netcdf文件中。
  • 数据在GCS中,我正在使用Cloud Storage FUSE读取和写入数据。
  • 我正在将数据从块大小:{'time':1,'latitude':521,'longitude':1440}重分配为chunksizes:{'time':8760,'latitude':21,'longitude' :20}

我尝试过:

export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

如最佳做法中所述(https://docs.dask.org/en/latest/array-best-practices.html?highlight=export#avoid-oversubscribing-threads

解决方法

最后,我通过写入带有以下块的中间Zarr存储解决了我的问题:{'time':8760,'latitude':260,'longitude':360}。即使CPU的资源仅在工作的一小部分中得到了充分利用,这种处理仍然非常迅速。然后,使用问题中描述的逐步过程的修改版,阅读此中间zarr并将其存储在最终分块中。尽管性能不理想,但性能令人满意。

CPU utilization when writing to intermediate store

CPU utilization when writing from intermediate to final store

代码如下:

def stepwise_to_zarr(dataset,step_dim,step_size,encoding,out_loc,group,include_end=True):
    start = dataset[step_dim].min()
    end = dataset[step_dim].max()
    iis = np.arange(start,end,step_size)
    if end > iis[-1]:
          iis = np.append(iis,end)

    lon=dataset.get_index(step_dim)
    first = True
    failures = []
    for i in range(1,len(iis)):
        lower,upper = (iis[i-1],iis[i])
        if upper >= end and include_end:
            lon_list= [l for l in lon if lower <= l <= upper]
        else:
            lon_list= [l for l in lon if lower <= l < upper]
    
        sub = dataset.sel(longitude=lon_list)
        write_sync=zarr.ThreadSynchronizer()
        if first:
            sub_write=sub.to_zarr(output_loc,group=varname,consolidated=True,synchronizer=write_sync,encoding=encoding,mode="w",compute=False)
            first = False
        else:
            sub_write=sub.to_zarr(output_loc,append_dim=step_dim,compute=False)
        sub_write.compute(retries=2)

z = xr.open_zarr(input_loc,group=groupname)
new_chunks = {'time':8760,'latitude':21,'longitude':20}
z_rechunked=z.chunk(new_chunks)

#Workaround to avoid:NotImplementedError: Specified zarr chunks (8760,260,360) would #overlap multiple dask chunks
#See https://github.com/pydata/xarray/issues/2300
encoding = {}
for v in ['var1','var2','var3']:
    encoding.update({v:z[v].encoding.copy()})
    encoding[v]["chunks"]=(96408,21,20)

stepwise_to_zarr(z_rechunked,"longitude",60,output_loc,group=groupname)

请注意,我必须重写编码才能重新编组。

此过程有效,但有点麻烦。我之所以这样做,是因为我没有听说过分块。下次我重新打包时,我将尝试重新打包以解决该问题。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...