执行 Dask dekayed 和计算时出现 FileNotFoundError

问题描述

我不熟悉并行处理并询问应用程序。所以我有 1000 个文件要并行运行,所以我使用 dask 计算来执行此操作。我的工人和核心已正确分配。我在 JuputerLab 环境中运行所有东西。

这是我的尝试;

连接到网关:

from dask_gateway import Gateway

gateway = Gateway(
    address=address,public_address=public_address,auth="jupyterhub",)
options = gateway.cluster_options()
options

选择不需要的工人数:

cluster = gateway.new_cluster(
    cluster_options=options,)
cluster.adapt(minimum=10,maximum=50)
client = cluster.get_client()
cluster
client

在这个列表中,我有一堆我的 gzip 文件,里面有 nc 文件

turb=glob.glob('Data/*')

这是我对这些文件执行的函数

def get_turb(file):
    
    name = str(file[5:18])
    d=[file[5:9],file[9:11],file[11:13],file[14:16],file[16:18]] 
    f_zip = gzip.open(file,'rb')
    
    yr=d[0]
    mo=d[1]
    da=d[2]
    hr=d[3]
    mn=d[4]
    
    fs = s3fs.S3FileSystem(anon=True)

    period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da),freq='D')
    # period.dayofyear
    dy=period.dayofyear

    cc=[7,8,9,10,11,12,13,14,15,16]  #look at the IR channels only for Now
    dat = xr.open_dataset(f_zip)

这里我正在做 dask 延迟和计算:

files = []

for grb_file in turb[:20]:
    s3_ds = dask.delayed(get_turb)(grb_file)
    files.append(s3_ds)
    
s3_ds.visualize()
files = dask.compute(*edr_files)

dask 延迟正确生成对象并将其附加到文件,但在运行计算时抛出错误

FileNotFoundError: [Errno 2] No such file or directory: 'Data/20190107_0300.gz'

事实上那里有同名的文件,我已经正确检查了它并且它运行完美而没有正确的dask延迟功能。有人可以指导我我错过了什么或做错了什么。非常感谢您的帮助!

解决方法

听起来工作人员在不同的机器上运行,在这种情况下,文件可能不存在(我无法想象为什么你会使用带有本地集群的 dask gateway,所以我假设这个集群是一个多节点集群)。

您要么需要将文件复制给您的工作人员,或者更好的是,从 S3 加载它们。

在您的代码片段中,您正在创建一个 s3fs 对象,但您根本没有使用它。如果这些文件在 s3 中,您应该调用 fs.open 而不仅仅是 open

要查看您的员工正在查看的内容,您可以执行以下操作

import os
# tells you which directory they're all pointing at
client.run(os.getcwd)
# tells you what's in there
client.run(lambda: os.listdir('./')