使用 pika 异步扭曲的更简单方法?

问题描述

这是我使用 rabbitmq 的第一个项目,我完全迷失了,因为我不确定解决问题的最佳方法是什么。

该程序相当简单,它只是侦听警报事件,然后将这些事件放入rabbitmq 队列中,但我在程序的架构上苦苦挣扎。

如果我为每个事件打开、发布然后关闭连接,我会增加很多延迟,并且会传输不必要的包(甚至比平时更多,因为我使用的是 TLS)...

如果我保持连接打开,并创建一个发布消息的函数(我只使用一个队列,非常基本),我最终会遇到问题,因为多个事件可能同时发生,我的程序将如果与rabbitmq 代理的连接结束,不知道该怎么办。

阅读他们的文档,该解决方案似乎使用了他们的“连接适配器”之一,这对我来说就像手套一样适合我,因为我只是将基本套接字中的所有连接内容重写为使用 Twisted(我真的很喜欢他们的高级方法)。但有一个问题。对于几乎不认为自己是“中级”的人来说,他们的 "basic example" 相当复杂。

一个完美的世界中,我将能够在与“警报服务器”相同的反应器中运行该服务并调用一个方法来发布消息。但我很难理解代码。有没有和鼠兔一起工作过的人可以给我指出更好的方向,甚至告诉我是否有更简单的方法

解决方法

好吧,我会发布对我有用的内容。可能不是最好的选择,但也许它可以帮助遇到同样问题的人。

首先我决定放弃 Twisted 并使用 Asyncio(没什么个人的,我只是想使用它,因为它已经在 python 中),即使 pika 有一个很好的使用异步的例子,我尝试并发现使用 aio_pika 更容易.

我最终得到了 2 个主要功能。一个用于发布者,另一个用于订阅者。 波纹管是我的代码对我有用...

# -*- coding: utf-8 -*-

import asyncio
import aio_pika
from myapp import conf

QUEUE_SEND = []


def add_queue_send(msg):
    """Add MSG to QUEUE

    Args:
        msg (string): JSON
    """
    QUEUE_SEND.append(msg)


def build_url(amqp_user,amqp_pass,virtual_host):
    """Build Auth URL

    Args:
        amqp_user (str): User name
        amqp_pass (str): Password
        virtual_host (str): Virtual Host

    Returns:
        str: AMQP URL
    """
    return ''.join(['amqps://',amqp_user,':','@',conf.get('amqp_host'),'/',virtual_host,'?cafile=',conf.get('ca_cert'),'&keyfile=',conf.get('client_key'),'&certfile=',conf.get('client_cert'),'&no_verify_ssl=0'])


async def process_message(message: aio_pika.IncomingMessage):
    """Read a new message

    Args:
        message (aio_pika.IncomingMessage): Mensagem
    """
    async with message.process():
        #   TODO: Do something with the new message
        await asyncio.sleep(1)


async def consumer(url):
    """Keep listening to a MQTT queue

    Args:
        url (str): URL

    Returns:
        aio_pika.Connection: Conn?
    """
    connection = await aio_pika.connect_robust(url=url)
    # Channel
    channel = await connection.channel()
    # Max concurrent messages?
    await channel.set_qos(prefetch_count=100)
    # Queue
    queue = await channel.declare_queue(conf.get('amqp_queue_client'))
    #   What call when a new message is received
    await queue.consume(process_message)
    #   Returns the connection?
    return connection


async def publisher(url):
    """Send messages from the queue.

    Args:
        url (str): URL de autenticação
    """
    connection = await aio_pika.connect_robust(url=url)
    # Channel
    channel = await connection.channel()
    while True:
        if QUEUE_SEND:
            #   If the list (my queue) is not empty
            msg = aio_pika.Message(body=QUEUE_SEND.pop().encode())
            await channel.default_exchange.publish(msg,routing_key='queue')
        else:
            #   Just wait
            await asyncio.sleep(1)
    await connection.close()

我开始使用``loop.create_task```。

正如我所说。它对我有用(即使我的代码的另一部分仍然有问题),但我不想让这个问题悬而未决,因为大多数人都会遇到同样的问题。

如果您知道更好的方法或更优雅的方法,请分享。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...