Dask Cluster 不处理任何数据,一段时间后只是闲置,几周前它工作得很好

问题描述

所以我正在尝试使用 dask 集群并行化该过程。这是我的尝试。

准备集群:

gateway = Gateway(
    address="http://traefik-pangeo-dask-gateway/services/dask-gateway",public_address="https://pangeo.aer-gitlab.com/services/dask-gateway",auth="jupyterhub",)
options = gateway.cluster_options()
options
cluster = gateway.new_cluster(
    cluster_options=options,)
cluster.adapt(minimum=90,maximum=100)
client = cluster.get_client()
cluster
client

然后我有一个函数可以从 S3 加载文件并对其进行处理,然后将其上传回不同的 s3 存储桶。

函数处理GOES数据并从中选择特定区域并将其保存到nc文件然后到S3

def get_records(rec):
    
    d=[rec[-1][0:4],rec[-1][4:6],rec[-1][6:8],rec[-1][9:11],rec[-1][11:13]] 
    
    yr=d[0]
    mo=d[1]
    da=d[2]
    hr=d[3]
    mn=d[4]
    
    ps = s3fs.S3FileSystem(anon=True)

    period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da),freq='D')
    dy=period.dayofyear
    print(dy)

    cc=[7,8,9,10,11,12,13,14,15,16]  #look at the IR channels only for Now
    dy="{0:0=3d}".format(dy)

    # this loop is for 10 different channels
    for c in range(10):
        ch="{0:0=2d}".format(cc[c])

    # opening 2 different time slices of given particular record
        F1=xr.open_dataset(ps.open(ps.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-2]))[['Rad']]

        F2=xr.open_dataset(ps.open(ps.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-1]))[['Rad']]
      
    # Selecting data as per given record radiance
        G1 = F1.where((F1.x >= (rec[0]-0.005)) & (F1.x <= (rec[0]+0.005)) & (F1.y >= (rec[1]-0.005)) & (F1.y <= (rec[1]+0.005)),drop=True)
        G2 = F2.where((F2.x >= (rec[0]-0.005)) & (F2.x <= (rec[0]+0.005)) & (F2.y >= (rec[1]-0.005)) & (F2.y <= (rec[1]+0.005)),drop=True)
       
    # Concating 2 time slices togethere
        G = xr.concat([G1,G2],dim  = 'time')

    # Concatiating different channels
        if c == 0:
            T = G    
        else:
            T = xr.concat([T,G],dim = 'channel')

    # Saving into nc file and storing them to S3
    path = rec[-1]+'.nc'
    T.to_netcdf(path)
    fs.put(path,bucket+path)    

使用 dask 并行运行它们。我让所有集群运行 50 个文件一次,然后清除内存并再次运行它以运行下一个 50 个文件

for j in range(0,len(records),50):
   files = []
   for i in range(j,j+50):
        s3_ds = dask.delayed(get_records)(records[i])
        files.append(s3_ds)

    files = dask.compute(*files)
    client.restart()

所以现在的问题是我的集群将处理文件一段时间,就像我有 10 个集群在运行一样,所以过一段时间后,它们会一个一个地停止处理数据并处于理想状态,即使它们还有内存在他们之中。他们不会做任何事情。他们将处理 20-30 个文件,然后什么都不做。所以我尝试一次只提供 20 个文件,然后他们会在 10-12 个文件后停止处理。下面我附上了一些图像,即使它们有内存,它们也是如何理想地放置的。最重要的是,在我运行相同代码之前的几周,它运行得非常好。我现在不知道是什么问题。

enter image description here

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)