问题描述
所以我正在尝试使用 dask 集群并行化该过程。这是我的尝试。
准备集群:
gateway = Gateway(
address="http://traefik-pangeo-dask-gateway/services/dask-gateway",public_address="https://pangeo.aer-gitlab.com/services/dask-gateway",auth="jupyterhub",)
options = gateway.cluster_options()
options
cluster = gateway.new_cluster(
cluster_options=options,)
cluster.adapt(minimum=90,maximum=100)
client = cluster.get_client()
cluster
client
然后我有一个函数可以从 S3 加载文件并对其进行处理,然后将其上传回不同的 s3 存储桶。
函数处理GOES数据并从中选择特定区域并将其保存到nc
文件然后到S3
:
def get_records(rec):
d=[rec[-1][0:4],rec[-1][4:6],rec[-1][6:8],rec[-1][9:11],rec[-1][11:13]]
yr=d[0]
mo=d[1]
da=d[2]
hr=d[3]
mn=d[4]
ps = s3fs.S3FileSystem(anon=True)
period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da),freq='D')
dy=period.dayofyear
print(dy)
cc=[7,8,9,10,11,12,13,14,15,16] #look at the IR channels only for Now
dy="{0:0=3d}".format(dy)
# this loop is for 10 different channels
for c in range(10):
ch="{0:0=2d}".format(cc[c])
# opening 2 different time slices of given particular record
F1=xr.open_dataset(ps.open(ps.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-2]))[['Rad']]
F2=xr.open_dataset(ps.open(ps.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-1]))[['Rad']]
# Selecting data as per given record radiance
G1 = F1.where((F1.x >= (rec[0]-0.005)) & (F1.x <= (rec[0]+0.005)) & (F1.y >= (rec[1]-0.005)) & (F1.y <= (rec[1]+0.005)),drop=True)
G2 = F2.where((F2.x >= (rec[0]-0.005)) & (F2.x <= (rec[0]+0.005)) & (F2.y >= (rec[1]-0.005)) & (F2.y <= (rec[1]+0.005)),drop=True)
# Concating 2 time slices togethere
G = xr.concat([G1,G2],dim = 'time')
# Concatiating different channels
if c == 0:
T = G
else:
T = xr.concat([T,G],dim = 'channel')
# Saving into nc file and storing them to S3
path = rec[-1]+'.nc'
T.to_netcdf(path)
fs.put(path,bucket+path)
使用 dask 并行运行它们。我让所有集群运行 50 个文件一次,然后清除内存并再次运行它以运行下一个 50 个文件
for j in range(0,len(records),50):
files = []
for i in range(j,j+50):
s3_ds = dask.delayed(get_records)(records[i])
files.append(s3_ds)
files = dask.compute(*files)
client.restart()
所以现在的问题是我的集群将处理文件一段时间,就像我有 10 个集群在运行一样,所以过一段时间后,它们会一个接一个地停止处理数据并处于理想状态,即使它们还有内存在他们之中。他们不会做任何事情。他们将处理 20-30 个文件,然后什么都不做。所以我尝试一次只提供 20 个文件,然后他们会在 10-12 个文件后停止处理。下面我附上了一些图像,即使它们有内存,它们也是如何理想地放置的。最重要的是,在我运行相同代码之前的几周,它运行得非常好。我现在不知道是什么问题。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)