类型错误:无法pickle '_thread.lock' 对象 Dask 计算

问题描述

我正在尝试使用 dask 进行多处理。我有一个必须运行 10000 个文件函数,并将生成文件作为输出函数将 S3 存储桶中的文件作为输入,并使用 S3 内部具有相似日期和时间的一个文件。我在 JupyterLab 中做所有事情

这是我的功能

def get_temp(file,name):
    
    d=[name[0:4],name[4:6],name[6:8],name[9:11],name[11:13]] 
    f_zip = gzip.decompress(file)
    
    yr=d[0]
    mo=d[1]
    da=d[2]
    hr=d[3]
    mn=d[4]
    
    fs = s3fs.S3FileSystem(anon=True)

    period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da),freq='D')
    # period.dayofyear
    dy=period.dayofyear

    cc=[7,8,9,10,11,12,13,14,15,16]  #look at the IR channels only for Now
    dat = xr.open_dataset(f_zip)
    dd=dat[['recNum','trackLat','trackLon','temp']]
    dd=dd.to_dataframe()
    dd = dd.dropna()
    dd['num'] = np.arange(len(dd))
    
    l=dd.where((dd.trackLat>-50.0) & (dd.trackLat<50.0) & (dd.trackLon>-110.0) & (dd.trackLon<10.0))
    l = l.dropna() 
    l.reset_index()
    
    dy="{0:0=3d}".format(dy)
    
#opening goes data from S3
    F=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C07'+'*')[int(mn)//15]))
    
#Converting Lat lon to radiance
    req=F['goes_imager_projection'].semi_major_axis
    oneovf=F['goes_imager_projection'].inverse_flattening
    rpol=F['goes_imager_projection'].semi_minor_axis 
    e = 0.0818191910435
    sat_h=F['goes_imager_projection'].perspective_point_height

    H=req+sat_h
    gc=np.deg2rad(F['goes_imager_projection'].longitude_of_projection_origin)

    phi=np.deg2rad(l.trackLat.values)
    gam=np.deg2rad(l.trackLon.values)

    phic=np.arctan((rpol**2/req**2)*np.tan(phi))
    rc=rpol/np.sqrt((1-e**2*np.cos(phic)**2))
    sx=H-rc*np.cos(phic)*np.cos(gam-gc)
    sy=-rc*np.cos(phic)*np.sin(gam-gc)
    sz=rc*np.sin(phic)

    yy=np.arctan(sz/sx)
    xx=np.arcsin(-sy/(np.sqrt(sx**2+sy**2+sz**2)))
    
    for i in range(len(xx)):
        for c in range(len(ch):
            ch="{0:0=2d}".format(cc[c])
            
            F1=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[0]))

            F2=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-1]))
           
            G1 = F1.where((F1.x >= (xx[i]-0.005)) & (F1.x <= (xx[i]+0.005)) & (F1.y >= (yy[i]-0.005)) & (F1.y <= (yy[i]+0.005)),drop=True)
            G2 = F2.where((F2.x >= (xx[i]-0.005)) & (F2.x <= (xx[i]+0.005)) & (F2.y >= (yy[i]-0.005)) & (F2.y <= (yy[i]+0.005)),drop=True)
            
            G = xr.concat([G1,G2],dim  = 'time')
            G = G.assign_coords(channel=(ch))
            
            if c == 0:
                T = G    
            else:
                T = xr.concat([T,G],dim = 'channel')
                
        T = T.assign_coords(temp=(str(l['temp'][i])))
        print(l.iloc[i]['num'])
        path = name+'_'+str(int(l.iloc[i]['num']))+'.nc'          
        T.to_netcdf(path)
    
#zipping the file
        with zipfile.ZipFile(name+'_'+str(int(l.iloc[i]['num']))+'.zip','w',compression=zipfile.ZIP_DEFLATED) as zf:
            zf.write(path,arcname=str(name+'_'+str(int(l.iloc[i]['num']))+'.nc'))

 # Storing it to S3    
        s3.Bucket(BUCKET).upload_file(path[:-3]+'.zip',"Output/" + path[:-3]+'.zip')
         

这是我从 S3 调用数据:

s3 = boto3.resource('s3')

s3client = boto3.client(
    's3',region_name='us-east-1'
)

bucketname = s3.Bucket('temp')
filedata = []
keys = []
names = []

for my_bucket_object in bucketname.objects.all():
    keys.append(my_bucket_object.key)
    
for i in range(1,21):
    fileobj = s3client.get_object(
        Bucket='temp',Key=(keys[i]))

    filedata.append(fileobj['Body'].read())
    names.append(keys[i][10:-3])

最初,我只是尝试运行 20 个文件以进行测试。

这是我创建的 dask 延迟和计算函数

temp_files = []

for i in range(20):
    s3_ds = dask.delayed(get_temp)(filedata[i],names[i])
    temp_files.append(s3_ds)

temp_files = dask.compute(*temp_files)

这里是完整的错误日志:

distributed.protocol.pickle - INFO - Failed to serialize <function get_temp at 0x7f20a9cb8550>. Exception: cannot pickle '_thread.lock' object
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
   3319         with _cache_lock:
-> 3320             result = cache_dumps[func]
   3321     except KeyError:

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in __getitem__(self,key)
   1572     def __getitem__(self,key):
-> 1573         value = super().__getitem__(key)
   1574         self.data.move_to_end(key)

/srv/conda/envs/notebook/lib/python3.8/collections/__init__.py in __getitem__(self,key)
   1009             return self.__class__.__missing__(self,key)
-> 1010         raise KeyError(key)
   1011     def __setitem__(self,key,item): self.data[key] = item

KeyError: <function get_temp at 0x7f20a9cb8550>

During handling of the above exception,another exception occurred:

TypeError                                 Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x,buffer_callback,protocol)
     52                 buffers.clear()
---> 53                 result = cloudpickle.dumps(x,**dump_kwargs)
     54         elif not _always_use_pickle_for(x) and b"__main__" in result:

/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj,protocol,buffer_callback)
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()

/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self,obj)
    562         try:
--> 563             return Pickler.dump(self,obj)
    564         except RuntimeError as e:

TypeError: cannot pickle '_thread.lock' object

During handling of the above exception,another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-77-fa46004f5919> in <module>
----> 1 temp_files = dask.compute(*temp_files)

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args,**kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk,keys,**kwargs)
    453     return repack([f(r,*a) for r,(f,a) in zip(results,postcomputes)])
    454 

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in get(self,dsk,restrictions,loose_restrictions,resources,sync,asynchronous,direct,retries,priority,fifo_timeout,actors,**kwargs)
   2703         Client.compute: Compute asynchronous collections
   2704         """
-> 2705         futures = self._graph_to_futures(
   2706             dsk,2707             keys=set(flatten([keys])),/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self,user_priority,actors)
   2639                 {
   2640                     "op": "update-graph",-> 2641                     "tasks": valmap(dumps_task,dsk),2642                     "dependencies": dependencies,2643                     "keys": list(map(tokey,keys)),/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
   3356             return d
   3357         elif not any(map(_maybe_complex,task[1:])):
-> 3358             return {"function": dumps_function(task[0]),"args": warn_dumps(task[1:])}
   3359     return to_serialize(task)
   3360 

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
   3320             result = cache_dumps[func]
   3321     except KeyError:
-> 3322         result = pickle.dumps(func,protocol=4)
   3323         if len(result) < 100000:
   3324             with _cache_lock:

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x,protocol)
     58         try:
     59             buffers.clear()
---> 60             result = cloudpickle.dumps(x,**dump_kwargs)
     61         except Exception as e:
     62             logger.info("Failed to serialize %s. Exception: %s",x,e)

/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj,buffer_callback)
     71                 file,protocol=protocol,buffer_callback=buffer_callback
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()
     75 

/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self,obj)
    561     def dump(self,obj):
    562         try:
--> 563             return Pickler.dump(self,obj)
    564         except RuntimeError as e:
    565             if "recursion" in e.args[0]:

TypeError: cannot pickle '_thread.lock' object

有人可以在这里帮助我并告诉我我做错了什么。除了dask,还有其他的并行处理方式吗?

所以我只有在将文件上传到 S3 存储桶时才发现它会抛出该错误,否则它可以正常工作。但是如果我不在 S3 中保存文件,我将无法弄清楚文件的存储位置。当我在 dask 上运行时,它会将文件保存在我无法找到的地方。我在 Jupyterlab 中运行我的代码,但在任何目录中都没有保存任何内容

解决方法

我花了一些时间来解析您的代码。

在大型函数中,您使用 s3fs 与您的云存储进行交互,这与 xarray 配合良好。

但是,在您的主代码中,您使用 boto3 列出和打开 S3 文件。这些文件保留对维护连接池的客户端对象的引用。那是不能腌制的东西。

s3fs 旨在与 Dask 一起使用,并确保文件系统实例和 OpenFile 对象的可选择性。由于您已经在某一部分使用了它,我建议您始终使用 s3fs(但我当然有偏见,因为我是主要作者)。

或者,您可以只传递文件名(作为字符串),并且在工作函数内之前不要打开任何东西。这将是“最佳实践” - 您应该在工作任务中加载数据,而不是在客户端加载并传递数据。