是否可以在不锁定的情况下与NamespaceProxy和BaseManager共享复杂的对象?

问题描述

编辑:

我设法“删除”了其中一个锁,但是它仍然很慢。有人知道那个锁在哪里吗?

class NoLock:
    def __init__(self):
        pass
    def __enter__(self):
        return self
    def __exit__(self,foo=None,bar=None,baz=None):
        pass

BaseManager._mutex = NoLock()
BaseProxy._mutex = NoLock()

我知道对于多处理数组,lock=False一个选项,但是对于复杂的对象也可以这样做吗?例如:

class Foo:
    def __init__(self):
        self._a = 1000
    def get_a(self):
        return self._a

class SharedFoo(NamespaceProxy):
    _exposed_ = ('__getattribute__','__getattr__','__setattr__','__init__','get_a')
    def get_a(self):
        callmethod = object.__getattribute__(self,'_callmethod')
        return callmethod('get_a',())
        
class FooManager(BaseManager):
    pass
if __name__ == '__main__':
    FooManager.register('SharedFoo',Foo,SharedFoo)
    with FooManager() as manager:
        for i in range(1000000):
            a = foo.get_a()
    processes = []

运行foo.get_a() 1000000次需要花费几秒钟的时间,这太慢了(我可能必须在实际程序中访问数十亿次)。显然这是由获取和释放锁引起的,是否可以手动管理锁,以便仅在使用某些功能时才能使程序锁?

谢谢

解决方法

我刚刚遇到了您的问题(迟到总比不到好)。首先,我不相信您提供的代码可以运行:foo 没有定义。我也不相信在函数级别上有锁定,如下所示。我已将方法 get_a 修改为 (1) 使用当前正在执行的线程 id 打印出开始消息,(2) 休眠 2 秒,(3) 使用当前线程 id 打印出结束消息和 ( 4) 最后返回它的值。我创建了一个包含 2 个进程的处理池,并向调用方法 SharedFoo 的工作函数并行提交了两次名为 fooget_a 实例。开始和结束消息以及总经过时间清楚地表明对 get_a 的调用不是单线程通过该方法:

from multiprocessing.managers import BaseManager,NamespaceProxy
from multiprocessing.pool import Pool
from threading import get_ident
import time

class Foo:
    def __init__(self):
        self._a = 1000
    def get_a(self):
        print(f'started {get_ident()}')
        time.sleep(2)
        print(f'ended {get_ident()}')
        return self._a

class SharedFoo(NamespaceProxy):
    _exposed_ = ('__getattribute__','__getattr__','__setattr__','__init__','get_a')
    def get_a(self):
        callmethod = object.__getattribute__(self,'_callmethod')
        return callmethod('get_a',())

class FooManager(BaseManager):
    pass


def worker(foo):
    return foo.get_a()


if __name__ == '__main__':
    FooManager.register('SharedFoo',Foo,SharedFoo)
    with FooManager() as manager:
        foo = manager.SharedFoo()
        pool = Pool(2)
        t = time.time()
        pool.apply_async(worker,args=(foo,))
        pool.apply_async(worker,))
        # Wait for tasks to complete:
        pool.close()
        pool.join()
        print(time.time() - t)

打印:

started 22320
started 36068
ended 22320
ended 36068
2.1460039615631104

代理本身很慢。