zmq 多进程间通信

问题描述


我有 n 个进程,它们拥有自己的本地数据和操作,我希望每个进程将其本地数据的“快照”发送到其余运行的节点进程。
到目前为止,我的代码如下所示:
def node1():
    Process(target=sync_1).start()
    sleep(4)
    data = {'node': 1,'data': 'node 1 data'}

    context_b = zmq.Context()
    socket_b = context_b@R_502_6407@cket(zmq.PUB)

    connnected = False
    try:
        socket_b.bind("tcp://*:%s" % 5560)
        connnected = True
    except Exception as e:
        print(e)
    if connnected:
        topic = "101"
        try:
            socket_b.send_string(topic + ' ' + json.dumps(data))
        except Exception as e:
                print(e)
    socket_b.close()
    context_b.term()

def node2():
    Process(target=sync_2).start()

def sync_1():
    context_c = zmq.Context()
    socket_c = context_c@R_502_6407@cket(zmq.SUB)
    _port = 5560
    try:
        socket_c.connect("tcp://localhost:%s" % _port)
    except Exception as e:
        print(e)

    topicfilter = "101"
    socket_c.setsockopt_string(zmq.SUBSCRIBE,topicfilter,encoding='utf-8')

     try:
         raw = socket_c.recv().decode("utf-8")
         json0 = raw.find('{')
         topic = raw[0:json0].strip()
         msg = json.loads(raw[json0:])
         print("[SYNC 1] received {}-{}]".format(topic,msg))
     except Exception as e:
         print(e)

def sync_2():
    context_c = zmq.Context()
    socket_c = context_c@R_502_6407@cket(zmq.SUB)
    _port = 5560

    try:
        socket_c.connect("tcp://localhost:%s" % _port)
    except Exception as e:
        print(e)

    topicfilter = "101"
    socket_c.setsockopt_string(zmq.SUBSCRIBE,encoding='utf-8')

    try:
        raw = socket_c.recv().decode("utf-8")
        json0 = raw.find('{')
        topic = raw[0:json0].strip()
        msg = json.loads(raw[json0:])
        print("[SYNC 2] received {}-{}]".format(topic,msg))
    except Exception as e:
        print(e)

if __name__ == '__main__':
    Process(target=node1).start()
    Process(target=node2).start()

每个节点都有一个后台运行的“侦听器”进程(同步功能),以便接收每个节点的数据并相应地使用它,并且当所有子套接字都连接到一个节点(节点1在那种情况)但我希望每个节点都向所有侦听器发送数据,因此我不确定如何实现这一点,因为侦听器进程可以连接到一个端口。

此外,每次有更新时,节点都必须发送本地数据快照,因此这不能是一次性通信,因此我想到了让侦听器进程一直积极等待更新。

我相信一个图表可能对这个问题有用:

enter image description here

可以有更简单的方法解决此问题,因此我们将不胜感激!

解决方法

更新: 解决方案是使用 XPUB-XSUB 模式。
通过使用这种模式,我创建了一个代理线程,允许我做我想做的事。
我能找到的最有用的 Python 示例是 this