获取“ConnectionError:Error 10054 ...”只是为了使用并行处理Ray触摸数据帧

问题描述

每次我的 Ray 并行处理与特定数据帧交互时,我都会收到错误消息。真正令人困惑的是,如果我制作另一个具有相同形状和大小的数据框并用随机字符串填充它,我不会收到错误消息,并且一切正常。更令人困惑的是,如果我以任何方式复制数据框(copy.deepcopy(),使用循环构建一个新的 df,它是容易出错的副本的副本,等等),我仍然得到错误。我什至将数据框转换为不同的文件类型,以防万一它以某种方式出现读取问题。我什至测试过我的电脑有 128Gbs 的 RAM,但这也没有解决任何问题(谈论绝望)。对这个错误代码的所有研究一直引导我找到与网络相关的解决方案,但这是一个完全本地的程序。

最底层的错误: “ConnectionError:写入套接字时出现错误 10054。远程主机强行关闭了现有连接。” (剩下的错误我会写在最后)

我正在使用的这个文件是什么,一旦它是一个数据框 python 对象,Ray 就会以某种方式讨厌它?这是一个 .feather 文件,大约 10Gbs 大,22928487 行 × 21 列,但在我使用 Ray 之前,我把它变小了:16593943 行 × 7 列。无论哪种方式,我仍然收到错误。事实上,即使我减少了文件的并行工作负载,它仍然给我错误。运行这段代码需要大量内存,因为数据帧太大了。我有 64Gb 的 RAM 并且运行使我的计算机使用了大约 45Gbs 的 RAM。只是在阅读文件后(在使用Ray之前)闲坐,计算机总共使用了大约30Gbs的RAM。除了 Ray(并行)讨厌它之外,我对羽毛数据框没有任何问题。我也没有遇到任何 RAM 错误,除非我在具有 32Gbs 或 16Gbs RAM 的另一台计算机上使用此代码

我已经花了超过 2 周的时间试图解决这个问题,所以我非常感谢您的帮助。我已尽最大努力减少代码,以便任何人都可以更轻松地插入。我担心复制可能会很困难,因为我无法为您提供大量文件,因为它太大而且也是专有的。老实说,这也可能是权限错误,但我不知道。

一个注意事项:无论以我的经验如何,开始时发出的警告射线都会发生 - 无论成功或失败。此外,仪表板永远无法访问。

错误在下面完整列出:

2021-02-19 16:59:00,436 INFO services.py:1173 -- View the Ray dashboard at http://127.0.0.1:8265
2021-02-19 17:01:14,967 WARNING worker.py:1034 -- Warning: The remote function __main__.ParallelFeather has size 6235199768 when pickled. It will be stored in Redis,which Could cause memory issues. This may mean that its deFinition uses a large array or other object.

---------------------------------------------------------------------------
ConnectionResetError                      Traceback (most recent call last)
~\anaconda3\lib\site-packages\redis\connection.py in send_packed_command(self,command,check_health)
    705             for item in command:
--> 706                 sendall(self._sock,item)
    707         except socket.timeout:

~\anaconda3\lib\site-packages\redis\_compat.py in sendall(sock,*args,**kwargs)
      8 def sendall(sock,**kwargs):
----> 9     return sock.sendall(*args,**kwargs)
     10 

ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception,another exception occurred:

ConnectionError                           Traceback (most recent call last)
<ipython-input-5-a691c5b84f12> in <module>
     33     StartFeather=int((i/processors)*len(DataFrame)/Modifier) #e.g. "1/12 of the column" divided by modifier to further illustrate how this doesn't work
     34     EndFeather=int(((i+1)/processors)*len(DataFrame)/Modifier) # e.g. "2/12 of the column" divided by modifier to further illustrate how this doesn't work
---> 35     (ParallelFeather.remote(Start=StartFeather,End=EndFeather)) # "+" does not work on ray objects with lists...sad
     36     i=i+1
     37 

~\anaconda3\lib\site-packages\ray\remote_function.py in _remote_proxy(*args,**kwargs)
     99         @wraps(function)
    100         def _remote_proxy(*args,**kwargs):
--> 101             return self._remote(args=args,kwargs=kwargs)
    102 
    103         self.remote = _remote_proxy

~\anaconda3\lib\site-packages\ray\remote_function.py in _remote(self,args,kwargs,num_returns,num_cpus,num_gpus,memory,object_store_memory,accelerator_type,resources,max_retries,placement_group,placement_group_bundle_index,placement_group_capture_child_tasks,override_environment_variables,name)
    205 
    206             self._last_export_session_and_job = worker.current_session_and_job
--> 207             worker.function_actor_manager.export(self)
    208 
    209         kwargs = {} if kwargs is None else kwargs

~\anaconda3\lib\site-packages\ray\function_manager.py in export(self,remote_function)
    153                 "collision_identifier": self.compute_collision_identifier(
    154                     function),--> 155                 "max_calls": remote_function._max_calls
    156             })
    157         self._worker.redis_client.rpush("Exports",key)

~\anaconda3\lib\site-packages\redis\client.py in hset(self,name,key,value,mapping)
   3048                 items.extend(pair)
   3049 
-> 3050         return self.execute_command('HSET',*items)
   3051 
   3052     def hsetnx(self,value):

~\anaconda3\lib\site-packages\redis\client.py in execute_command(self,**options)
    898         conn = self.connection or pool.get_connection(command_name,**options)
    899         try:
--> 900             conn.send_command(*args)
    901             return self.parse_response(conn,command_name,**options)
    902         except (ConnectionError,TimeoutError) as e:

~\anaconda3\lib\site-packages\redis\connection.py in send_command(self,**kwargs)
    724         "Pack and send a command to the Redis server"
    725         self.send_packed_command(self.pack_command(*args),--> 726                                  check_health=kwargs.get('check_health',True))
    727 
    728     def can_read(self,timeout=0):

~\anaconda3\lib\site-packages\redis\connection.py in send_packed_command(self,check_health)
    716                 errmsg = e.args[1]
    717             raise ConnectionError("Error %s while writing to socket. %s." %
--> 718                                   (errno,errmsg))
    719         except BaseException:
    720             self.disconnect()

ConnectionError: Error 10054 while writing to socket. An existing connection was forcibly closed by the remote host.

代码

print("Replicating ConnectionError 10054 by using ray on a (large) dataframe.")
print("What's crazy is that If i construct a dataframe of equal shape filled with random strings 'foo','bar','baz',as values,i do not get the error")



ray.shutdown() #prevents an error.     %pip install -U ray
ray.init()   #necessary to use ray

DataFrame=Feather #22928487 rows × 21 columns
@ray.remote
def ParallelFeather(Start,End):
    ThisEncodedFeather=[]
    EncodedWholeInstitutionDictionary={}
    listofChosenDataTypes=["institution_unmatched","domain","domain_m","country","city","edt_id","zip"] #These are column titles of the 21 columns of the dataframe
    row=Start
    while row<End:
        #if type(Feather["institution_unmatched"][row]) is str:
         #   pass
        if type(DataFrame) is str:
            pass
        #    for ChosenDataType in listofChosenDataTypes:
         #       EncodedWholeInstitutionDictionary[ChosenDataType]=Feather[ChosenDataType][row]
          #  ThisEncodedFeather.append(EncodedWholeInstitutionDictionary)
        row+=1
    #return ThisEncodedFeather

EncodedFeathers=[]
processors=multiprocessing.cpu_count() #for me,this is 16
i=0
while i<processors: 
    StartFeather=int((i/processors)*len(DataFrame)) #e.g. "1/16 of the total length" 
    EndFeather=int(((i+1)/processors)*len(DataFrame)) # e.g. "2/16 of the total length"
    (ParallelFeather.remote(Start=StartFeather,End=EndFeather))
    i=i+1

print("If you have made it this far,then the error did not happen")
Errors=True
Errors=False
if not Errors:
    DecodednestedFeathers=ray.get(EncodedFeathers) # gets me "ConnectionError: Error 10054 while writing to socket"
    DecodedFeathers=[]
    for Feathernest in DecodednestedFeathers:
        DecodedFeathers+=Feathernest

ray.shutdown()

DecodedFeathers

解决方法

你可以试试这个吗?我在评论中解释了您的问题的原因。

DataFrameRef=ray.put(Feather) #22928487 rows × 21 columns
@ray.remote
def ParallelFeather(DataFrame,Start,End):
    ThisEncodedFeather=[]
    EncodedWholeInstitutionDictionary={}
    ListOfChosenDataTypes=["institution_unmatched","domain","domain_m","country","city","edt_id","zip"] #These are column titles of the 21 columns of the dataframe
    row=Start
    while row<End:
        #if type(Feather["institution_unmatched"][row]) is str:
         #   pass
        if type(DataFrame) is str:
            pass
        #    for ChosenDataType in ListOfChosenDataTypes:
         #       EncodedWholeInstitutionDictionary[ChosenDataType]=Feather[ChosenDataType][row]
          #  ThisEncodedFeather.append(EncodedWholeInstitutionDictionary)
        row+=1
    #return ThisEncodedFeather

EncodedFeathers=[]
processors=multiprocessing.cpu_count() #for me,this is 16
i=0
while i<processors: 
    StartFeather=int((i/processors)*len(DataFrame)) #e.g. "1/16 of the total length" 
    EndFeather=int(((i+1)/processors)*len(DataFrame)) # e.g. "2/16 of the total length"
    (ParallelFeather.remote(DataFrameRef,Start=StartFeather,End=EndFeather))
    i=i+1