问题描述
这是我使用 rabbitmq 的第一个项目,我完全迷失了,因为我不确定解决问题的最佳方法是什么。
该程序相当简单,它只是侦听警报事件,然后将这些事件放入rabbitmq 队列中,但我在程序的架构上苦苦挣扎。
如果我为每个事件打开、发布然后关闭连接,我会增加很多延迟,并且会传输不必要的包(甚至比平时更多,因为我使用的是 TLS)...
如果我保持连接打开,并创建一个发布消息的函数(我只使用一个队列,非常基本),我最终会遇到问题,因为多个事件可能同时发生,我的程序将如果与rabbitmq 代理的连接结束,不知道该怎么办。
阅读他们的文档,该解决方案似乎使用了他们的“连接适配器”之一,这对我来说就像手套一样适合我,因为我只是将基本套接字中的所有连接内容重写为使用 Twisted(我真的很喜欢他们的高级方法)。但有一个问题。对于几乎不认为自己是“中级”的人来说,他们的 "basic example" 相当复杂。
在一个完美的世界中,我将能够在与“警报服务器”相同的反应器中运行该服务并调用一个方法来发布消息。但我很难理解代码。有没有和鼠兔一起工作过的人可以给我指出更好的方向,甚至告诉我是否有更简单的方法?
解决方法
好吧,我会发布对我有用的内容。可能不是最好的选择,但也许它可以帮助遇到同样问题的人。
首先我决定放弃 Twisted 并使用 Asyncio(没什么个人的,我只是想使用它,因为它已经在 python 中),即使 pika 有一个很好的使用异步的例子,我尝试并发现使用 aio_pika 更容易.
我最终得到了 2 个主要功能。一个用于发布者,另一个用于订阅者。 波纹管是我的代码对我有用...
# -*- coding: utf-8 -*-
import asyncio
import aio_pika
from myapp import conf
QUEUE_SEND = []
def add_queue_send(msg):
"""Add MSG to QUEUE
Args:
msg (string): JSON
"""
QUEUE_SEND.append(msg)
def build_url(amqp_user,amqp_pass,virtual_host):
"""Build Auth URL
Args:
amqp_user (str): User name
amqp_pass (str): Password
virtual_host (str): Virtual Host
Returns:
str: AMQP URL
"""
return ''.join(['amqps://',amqp_user,':','@',conf.get('amqp_host'),'/',virtual_host,'?cafile=',conf.get('ca_cert'),'&keyfile=',conf.get('client_key'),'&certfile=',conf.get('client_cert'),'&no_verify_ssl=0'])
async def process_message(message: aio_pika.IncomingMessage):
"""Read a new message
Args:
message (aio_pika.IncomingMessage): Mensagem
"""
async with message.process():
# TODO: Do something with the new message
await asyncio.sleep(1)
async def consumer(url):
"""Keep listening to a MQTT queue
Args:
url (str): URL
Returns:
aio_pika.Connection: Conn?
"""
connection = await aio_pika.connect_robust(url=url)
# Channel
channel = await connection.channel()
# Max concurrent messages?
await channel.set_qos(prefetch_count=100)
# Queue
queue = await channel.declare_queue(conf.get('amqp_queue_client'))
# What call when a new message is received
await queue.consume(process_message)
# Returns the connection?
return connection
async def publisher(url):
"""Send messages from the queue.
Args:
url (str): URL de autenticação
"""
connection = await aio_pika.connect_robust(url=url)
# Channel
channel = await connection.channel()
while True:
if QUEUE_SEND:
# If the list (my queue) is not empty
msg = aio_pika.Message(body=QUEUE_SEND.pop().encode())
await channel.default_exchange.publish(msg,routing_key='queue')
else:
# Just wait
await asyncio.sleep(1)
await connection.close()
我开始使用``loop.create_task```。
正如我所说。它对我有用(即使我的代码的另一部分仍然有问题),但我不想让这个问题悬而未决,因为大多数人都会遇到同样的问题。
如果您知道更好的方法或更优雅的方法,请分享。