问题描述
我在 relevant Python 3.9.2 documentation regarding the synchronization of data among remote processes 上模拟了以下测试程序。不过,据我所知,它实际上并没有用,所以我认为有些事情我不知道。该文档并未明确说明如何在远程进程之间部署 SyncManager 对象,但它们毕竟是 BaseManager 子类的实例,因此必须假设相同的技术应该起作用。
在下面的代码之后是 shell 输出,显示了三个并发调用,大概说明了我遇到的问题。尽管正在与服务器建立连接,但 dict 并未同步。问题是:为什么?
#!/usr/bin/env python3
# <zteeq.py>
import multiprocessing as mp
import multiprocessing.shared_memory as sm
import multiprocessing.managers as mgrs
import os,sys
#######################################################
class CatalogManager( mgrs.SyncManager): pass
CatalogManager.register( 'get_catalog',dict,mgrs.DictProxy)
#######################################################
class ShareCatalog():
#######################################################
def __init__( self,catalogManagerAddress,catalogManagerAuthkey,**kwargs
):
self.catalogManagerAddress = catalogManagerAddress
self.catalogManagerAuthkey = catalogManagerAuthkey
#######################################################
def start( self):
self.catalogManager = CatalogManager(
self.catalogManagerAddress,self.catalogManagerAuthkey,)
try:
self.catalogManager.connect()
print( 'connected self.catalogManager')
except ConnectionRefusedError:
catalogManagerServer = self.catalogManager.get_server()
print( 'starting self.catalogManager')
catalogManagerServer.serve_forever()
self.catalog = self.catalogManager.get_catalog()
###
print( 'pid %d: first stop: %r' % ( os.getpid(),str( self.catalog)))
input()
###
if 'streams' not in self.catalog:
print( 'adding streams')
self.catalog[ 'streams'] = {}
###
print( 'pid %d: second stop: %r' % ( os.getpid(),str( self.catalog)))
input()
###
#######################################################
if __name__ == '__main__':
mp.set_start_method( 'spawn')
shareCatalog = ShareCatalog(
( '127.0.1.1',43210),b'abc',)
shareCatalog.start()
#</zteeq.py>
在第一个 shell 中,SyncManager 服务器启动:
# ./zteeq.py
starting self.catalogManager
让它继续运行,我在第二个 shell 中再次启动程序:
# ./zteeq.py
connected self.catalogManager
pid 2486196: first stop: '{}'
adding streams
pid 2486196: second stop: "{'streams': {}}"
到目前为止,一切都很好。我让它继续运行并第三次调用。但是第三次调用对第二次调用所做的事情一无所知;共享字典中没有“流”键:
# ./zteeq.py
connected self.catalogManager
pid 2492338: first stop: '{}'
我错过了什么?
(Python 3.9.2) (Linux 5.10.0-4-amd64 #1 SMP Debian 5.10.19-1 (2021-03-02) x86_64 GNU/Linux)
备注:一般来说,文档似乎假设所有 SyncManager 对象都将通过名为“multiprocessing.Manager()”的快捷方式创建,该快捷方式不提供远程套接字通信的规范。我假设这些对象会被所有将使用它的进程分叉继承,正如我迄今为止发现的所有示例所示。但这不是我想要做的。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)