问题描述
我正在尝试按照以下代码片段使用多处理来生成复杂的、不可选择的对象:
void removeAll(vector<int>& v,const int& x)
{
int count = 0;
for (vector<int>::iterator it = v.begin(); it < v.end(); it++)
{
if (*it == x && count > 0)
{
v.erase(it);
}
if (*it == x)
{
count++;
}
}
}
我需要更新一个可共享的变量,所以我使用 from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool
class Facility:
def __init__(self):
self.blocks = Manager().list()
def __process_blocks(self,block):
designer = block["designer"]
apply_terrain = block["terrain"]
block_type = self.__block_type_to_string(block["type"])
block = designer.generate_block(block_id=block["id"],block_type=block_type,anchor=Point(float(block["anchor_x"]),float(block["anchor_y"]),float(block["anchor_z"])),pcu_anchor=Point(float(block["pcu_x"]),float(block["pcu_y"]),0),corridor_width=block["corridor"],jb_height=block["jb_connect_height"],min_Boxes=block["min_Boxes"],apply_terrain=apply_terrain)
self.blocks.append(block)
def design(self,apply_terrain=False):
designer = FacilityBuilder(string_locator=self._string_locator,string_router=self._string_router,Box_router=self._Box_router,sorter=self._sorter,tracker_configurator=self._tracker_configurator,config=self._config)
blocks = [block.to_dict() for index,block in self._store.get_blocks().iterrows()]
for block in blocks:
block["designer"] = designer
block["terrain"] = apply_terrain
with ProcessingPool() as pool:
pool.map(self.__process_blocks,blocks)
初始化一个类级变量,如下所示:
multiprocessing.Manager
这给我留下了以下错误(仅部分堆栈跟踪):
self.blocks = Manager().list()
作为最后的手段,我尝试使用 File "C:\Users\Paul.Nel\Documents\repos\autopV\.autopv\lib\site-packages\dill\_dill.py",line 481,in load
obj = StockUnpickler.load(self)
File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\managers.py",line 933,in RebuildProxy
return func(token,serializer,incref=incref,**kwds)
File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\managers.py",line 783,in __init__
self._incref()
File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\managers.py",line 837,in _incref
conn = self._Client(self._token.address,authkey=self._authkey)
File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\connection.py",line 513,in Client
answer_challenge(c,authkey)
File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\connection.py",line 764,in answer_challe
nge
raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected
的标准 python
实现来尝试规避 ThreadPool
问题,但这也不太顺利。我已经阅读了许多类似的问题,但还没有找到解决这个特定问题的方法。是 pickle
的问题还是 dill
与 pathos
的接口方式的问题?
编辑:所以我设法用示例代码复制了它,如下所示:
mulitprocessing.Manager
添加 import os
import math
from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool
class MyComplex:
def __init__(self,x):
self._z = x * x
def me(self):
return math.sqrt(self._z)
class Starter:
def __init__(self):
manager = Manager()
self.my_list = manager.list()
def _f(self,value):
print(f"{value.me()} on {os.getpid()}")
self.my_list.append(value.me)
def start(self):
names = [MyComplex(x) for x in range(100)]
with ProcessingPool() as pool:
pool.map(self._f,names)
if __name__ == '__main__':
starter = Starter()
starter.start()
时发生错误。
解决方法
所以我已经解决了这个问题。如果像 mmckerns 这样的人或在多处理方面比我更了解的其他人可以评论为什么这是一个解决方案,我仍然会很棒。
问题似乎是在 Manager().list()
中声明了 __init__
。以下代码可以正常工作:
import os
import math
from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool
class MyComplex:
def __init__(self,x):
self._z = x * x
def me(self):
return math.sqrt(self._z)
class Starter:
def _f(self,value):
print(f"{value.me()} on {os.getpid()}")
return value.me()
def start(self):
manager = Manager()
my_list = manager.list()
names = [MyComplex(x) for x in range(100)]
with ProcessingPool() as pool:
my_list.append(pool.map(self._f,names))
print(my_list)
if __name__ == '__main__':
starter = Starter()
starter.start()
这里我声明了 list
本地的 ProcessingPool
操作。如果我愿意,我可以在之后将结果分配给一个类级别的变量。