Call_back 在 python nats.aio.client 中的 nats 订阅中不起作用

问题描述

我正在尝试使用 nats.aio.client 模块进行简单的发布订阅,但回调不起作用 下面是我的代码 from nats.aio.client import Client as NATS

类 NAT: def init(自我): self.nc = NATS()

async def run(self):
    # nc = NATS()
    print("connection starts")
    await self.nc.connect("demo.nats.io:4222",connect_timeout=10,verbose=True)
    print("connection success")

async def publish_msg(self):
    # nc = NATS()
    print("msg to publish")
    await self.nc.publish("Hello",b'Hellowelcome')

async def subscribe_msg(self):
    async def message_handler(msg):
        print("Hello")
        subject = msg.subject
        reply = msg.reply
        print("Received a message on '{subject} {reply}'".format(
            subject=subject,reply=reply))

    await self.nc.subscribe("Hello",cb=message_handler) 

文件

import asyncio
from nats_client import NAT

nat = NAT()
nats_connection = asyncio.get_event_loop()
nats_connection.run_until_complete(nat.run())
nats_connection.run_until_complete(nat.subscribe_msg())
nats_connection.run_until_complete(nat.publish_msg())
#nats_connection.close()

如果我遗漏任何东西,请告诉我,我们将不胜感激

解决方法

我认为您的程序可能退出得太早,因此既无法发布也无法接收消息,以下是有关如何在 NATS 中启动服务的完整示例:

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    nc = NATS()

    async def disconnected_cb():
        print("Got disconnected...")

    async def reconnected_cb():
        print("Got reconnected...")

    await nc.connect("127.0.0.1",reconnected_cb=reconnected_cb,disconnected_cb=disconnected_cb,max_reconnect_attempts=-1,loop=loop)

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject,reply=reply,data=data))
        await nc.publish(reply,b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    await nc.subscribe("help","workers",help_request)

    print("Listening for requests on 'help' subject...")
    for i in range(1,1000000):
        await asyncio.sleep(1)
        try:
            response = await nc.request("help",b'hi')
            print(response)
        except Exception as e:
            print("Error:",e)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()
    loop.close()