如何在流程之间正确共享Manager的用法

问题描述

我想做的是在Process的子类之间共享一个字典,当一个进程更新字典时,另一进程被告知要使用它。这在下面的代码中得到说明,其中MyProducer开始填充字典,并且在每次迭代中触发一个事件,通知MyConsumer处理字典。除了MyConsumer中的字典为空的部分之外,其他所有东西都起作用...

from multiprocessing import Process,Manager,Event

class MyProducer(Process):
    increment = 0
    def __init__(self,dictionary,event):
        Process.__init__(self)
        self.dictionary = dictionary
        self.event = event
    
    def run(self):
        while self.increment < 20:
            self.dictionary[self.increment]=self.increment+10
            self.increment = self.increment + 1
            print("From producer: ",self.dictionary)
            self.event.set()
            while self.event.is_set() is True:
                increment = self.increment
                increment = increment + 1
        
class MyConsumer(Process):
    def __init__(self,event):
        Process.__init__(self)
        self.dictionary = dictionary
        self.event = event
        
    
    def run(self):
        while True:
            self.event.wait()
            print("From consumer: ",self.dictionary)
            self.event.clear()
            

            
if __name__ == "__main__":

    with Manager() as manager:
        state_dict = manager.dict()
        state_ready = Event()
        producerprocess = MyProducer(state_dict,state_ready)
        consumerprocess = MyConsumer(state_dict,state_ready)
        producerprocess.start()
        consumerprocess.start()    

输出

Process MyProducer-2:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py",line 827,in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception,another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py",line 315,in _bootstrap
    self.run()
  File "main.py",line 13,in run
    self.dictionary[self.increment]=self.increment+10
  File "<string>",line 2,in __setitem__
  File "/usr/lib/python3.8/multiprocessing/managers.py",line 831,in _callmethod
    self._connect()
  File "/usr/lib/python3.8/multiprocessing/managers.py",line 818,in _connect
    conn = self._Client(self._token.address,authkey=self._authkey)
  File "/usr/lib/python3.8/multiprocessing/connection.py",line 502,in Client
    c = SocketClient(address)
  File "/usr/lib/python3.8/multiprocessing/connection.py",line 630,in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

更新

我的目的是了解为什么该词典不适用于Process子类。我知道您可以在互联网上找到所有可行的案例。实际上,我有一个可以正常工作的解决方案,只需将dict用队列替换,我想了解为什么dict无法正常工作。

from multiprocessing import Process,Queue,queue,event):
        Process.__init__(self)
        self.queue = queue
        self.event = event
    
    def run(self):
        while self.increment < 20:
            self.queue.put([self.increment,self.increment+10])
            self.increment = self.increment + 1
            print("From producer: ",self.queue.qsize())
            self.event.set()
            while self.event.is_set() is True:
                increment = self.increment
                increment = increment + 1
        
class MyConsumer(Process):
    def __init__(self,event):
        Process.__init__(self)
        self.queue = queue
        self.event = event
        
    def run(self):
        while True:
            self.event.wait()
            print("From consumer: ",self.queue.qsize())
            self.event.clear()
            

if __name__ == "__main__":
  state_queue = Queue()
  state_ready = Event()
  producerprocess = MyProducer(state_queue,state_ready)
  consumerprocess = MyConsumer(state_queue,state_ready)
  producerprocess.start()
  consumerprocess.start()  

解决方法

仅供参考,通过这个简单的程序,我看到了几乎相同的死亡类型:

from multiprocessing import Process,Manager,Event

class MyProducer(Process):
    def __init__(self,value,event):
        Process.__init__(self)
        self.val = value
        self.event = event

    def run(self):
        print("at producer start",self.val.value)
        self.val.value = 42
        self.event.set()

class MyConsumer(Process):
    def __init__(self,event):
        Process.__init__(self)
        self.val = value
        self.event = event

    def run(self):
        self.event.wait()
        print("From consumer: ",self.val.value)
                        
if __name__ == "__main__":
    with Manager() as manager:
        state_value = manager.Value('i',666)
        state_ready = Event()
        producerprocess = MyProducer(state_value,state_ready)
        consumerprocess = MyConsumer(state_value,state_ready)
        producerprocess.start()
        consumerprocess.start()

含义是,从Manager获得的 no 类型的对象在作为属性附加到对象mp时必须进行重构,而mp必须在工作进程中通过魔术来构造。连接到Manager服务器进程所需的信息似乎丢失了(无论是Linux-y系统上的套接字还是Windows上的命名管道)。

您可以提交错误报告,但是在此之前,除了重写代码以不使用管理器或将管理器对象显式传递给函数外,没有其他事情要做。

错误报告可以有两种解决方法:(1)使它“起作用”;或者,(2)在尝试创建此类对象的尝试时,更改了代码以引发异常。

另一种可能性(尚未尝试):如果仅在Linux上运行,则可以跳过__name__ == "__main__"测试,并希望Manager连接信息能够在fork()中生存下来。

编辑

我在Python项目的跟踪器上打开了一个问题,

https://bugs.python.org/issue41660

WORKAROUND

在处理Python问题报告中的内容时,这里的“问题”似乎并不是如何设置问题,但是在您的代码中却忽略了彻底关闭工作人员的需要。只需在代码末尾添加此行(dict版本-您关心的版本):

    producerprocess.join()

足够了,因此,现在在我的电脑上(Win 10 Python 3.8.5),它会产生您期望的输出。但是,它会永远挂起,因为您的消费者.wait()永远因为Event而永无休止。

我的猜测(我确定80%是正确的):如果没有.join(),则主进程继续运行解释器关闭代码(没什么可做的!),然后开始强制销毁multiprocessing实现的东西仍然需要正确运行。

使用.join(),主流程将阻塞,直到生产者完成为止-在此期间未启动任何关闭代码,并且.join()明确指示生产者过程关闭其(详细说明) !)multiprocessing干净地跳舞。

它可能仍使使用者流程处于损坏状态,但是,如果这样,我们将永远看不到任何证据,因为使用者被其self.event.wait()永远封锁了。

在真实的程序中,您也应该采取一切措施彻底关闭使用者进程。

完整代码

这是一个完整的程序,显示了惯用的Python和并行编程的最佳实践:一切正常关闭,没有“忙循环”,没有比赛,没有死锁。 State的实现比此特定问题所需的要复杂得多,但是它说明了一种值得学习的强大方法。

import multiprocessing as mp

P,C,F = 1,2,4 # bit flags for state values

# Unusual synchronization appears to be wanted here:
# After a producer makes a mutation,it must not make another
# before the consumer acts on it.  So we'll say we're in state
# P when the producer is allowed to mutate,and in state C
# when there's a mutation for the consumer to process.  Another
# state - F (for "finished") - tells the consumer it's time to
# quit. The producer stops on its own when it gets tired of
# mutating ;-)
class State:
    def __init__(self):
        # Initial state is empty - everyone is blocked.
        # Note that we do our own locking around the shared
        # memory,via the condition variable's mutex,so
        # it would be pure waste for the Value to have
        # its own lock too.
        self.state = mp.Value('B',lock=False)
        self.changed = mp.Condition()

    # Wait for state to change to one of the states in the
    # flag mask `what`.  Return the bit flag of the state
    # that succeeded.
    def waitfor(self,what):
        with self.changed:
            while not (self.state.value & what):
                self.changed.wait()
            return self.state.value

    # Force state to (bit flag) `what`,and notify waiters
    # to wake up and see whether it's the state they're
    # waiting for.
    def setwhat(self,what):
        with self.changed:
            self.state.value = what
            self.changed.notify_all()

class Base(mp.Process):
    def __init__(self,dictionary,state):
        super().__init__()
        self.dictionary = dictionary
        self.state = state

class MyProducer(Base):
    def __init__(self,*args):
        super().__init__(*args)
        self.increment = 0

    def run(self):
        while self.increment < 20:
            self.state.waitfor(P)
            self.dictionary[self.increment] = self.increment + 10
            self.state.setwhat(C)
            # Whether the producer or the consumer prints the dict
            # first isn't forced - and,indeed,they can both print at
            # the same time,producing garbled output.  Move the
            # print() above the setwhat(C) to force the producer
            # to print first,if desired.
            print("From producer: ",self.dictionary)
            self.increment += 1

class MyConsumer(Base):
    def run(self):
        while self.state.waitfor(C | F) != F:
            print("From consumer: ",self.dictionary)
            self.state.setwhat(P)

def main():
    with mp.Manager() as manager:
        state_dict = manager.dict()
        state_state = State()
        producerprocess = MyProducer(state_dict,state_state)
        consumerprocess = MyConsumer(state_dict,state_state)
        producerprocess.start()
        consumerprocess.start()

        # The producer is blocked waiting for state P,and the
        # consumer is blocked waiting for state C (or F). The
        # loop here counts down 5 seconds,so you can verify
        # by eyeball that the waits aren't "busy" (they consume
        # essentially no CPU cycles).
        import time
        for i in reversed(range(5)):
            time.sleep(1)
            print(i)

        state_state.setwhat(P) # tell the producer to start!
        producerprocess.join() # and wait for it to finish
        # wait for the consumer to finish eating the last mutation
        state_state.waitfor(P)
        # tell the consumer we're all done
        state_state.setwhat(F)
        consumerprocess.join()    

if __name__ == "__main__":
    main()