为什么pyzmq订阅者与asyncio的行为有所不同?

问题描述

我有一个XPUB / XSUB设备,并且在一个进程中运行着许多模拟发布者。在一个单独的过程中,我想连接一个订户并将接收到的消息打印到终端。下面,我将显示一个简单函数的两个变体来完成此任务。我将这些功能包装为命令行实用程序。

我的问题是asyncio变体从不接收消息

另一方面,非异步变量也可以正常工作。我已经测试了所有情况下的ipc和tcp传输。除了我重新启动发布以更改传输方式外,发布过程在测试中不会更改。这些消息是短字符串,大约每秒发布一次,因此我们没有在研究性能问题。

订户程序无限期地位于msg = await sock.receive_multipart()行。在XPUB / XSUB设备中,我具有检测sock.setsockopt(zmq.SUBSCRIBE,channel.encode())消息转发的工具,与非异步变量连接时相同。

asyncio变体(如描述的那样无效)

def subs(url,channel):
    import asyncio

    import zmq
    import zmq.asyncio

    ctx = zmq.asyncio.Context.instance()
    sock = ctx.socket(zmq.SUB)
    sock.connect(url)
    sock.setsockopt(zmq.SUBSCRIBE,channel.encode())

    async def task():
        while True:
            msg = await sock.recv_multipart()
            print(' | '.join(m.decode() for m in msg))

    try:
        asyncio.run(task())
    finally:
        sock.setsockopt(zmq.LINGER,0)
        sock.close()

常规阻止变体(可以正常运行)

def subs(url,channel):
    import zmq

    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.SUB)
    sock.connect(url)
    sock.setsockopt(zmq.SUBSCRIBE,channel.encode())

    def task():
        while True:
            msg = sock.recv_multipart()
            print(' | '.join(m.decode() for m in msg))

    try:
        task()
    finally:
        sock.setsockopt(zmq.LINGER,0)
        sock.close()

对于此特定工具,无需使用asyncio。但是,我也在代码的其他地方也遇到了这个问题,而异步接收却从未收到。因此,我希望通过在这种简单情况下将其清除,以了解总体上出了什么问题。

我的版本是

import zmq
zmq.zmq_version()  # '4.3.2'
zmq.__version__  # '19.0.2'

我在MacOS 10.13.6上。

我完全没有主意。互联网,请帮忙!

解决方法

一个有效的异步变体是

def subs(url,channel):
    import asyncio

    import zmq
    import zmq.asyncio

    ctx = zmq.asyncio.Context.instance()

    async def task():
        sock = ctx.socket(zmq.SUB)
        sock.connect(url)
        sock.setsockopt(zmq.SUBSCRIBE,channel.encode())

        try:
            while True:
                msg = await sock.recv_multipart()
                print(' | '.join(m.decode() for m in msg))
        finally:
            sock.setsockopt(zmq.LINGER,0)
            sock.close()

    asyncio.run(task())

我得出结论,当使用asyncio zmq时,必须使用在事件循环上运行的调用来创建套接字,从中等待套接字。即使原始形式对事件循环没有任何作用,但套接字似乎具有与asyncio.run使用的事件循环不同的事件循环。我不知道为什么,并且我没有打开pyzmq的问题,因为他们的文档显示的用法与该答案相同,没有评论。

编辑以回应评论:

asyncio.run总是创建一个新的事件循环,因此,显然是为在传递给asyncio.run的协同例程之外实例化的套接字创建的循环(如原始问题中的asyncio变体)不同。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...