如何让pika首先在优先级队列上收听?

问题描述

我有2个通过Rabbitmq进行通信的微服务,我需要实现优先级消息。

一个微服务充当发布者,以symfony + Messenger(amqp传输)编写。

第二个微服务充当消费者,用python + pika编写。

Messenger文档(https://symfony.com/doc/current/messenger.html#prioritized-transports)建议针对不同的消息优先级使用单独的队列,该组件无法使用Rabbitmq的内置功能来对消息进行优先级排序。实际上,发布者没有任何问题,我对其进行了配置,以使必要的消息进入优先级队列。

消费者遇到了问题,我无法让pika首先读取优先级队列,然后再读取常规队列。

这是我的Messenger组件配置的一个示例:

framework:
    messenger:
        transports:
            priority:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    exchange:
                        name: priority
                    queues:
                        priority: ~
            normal:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    exchange:
                        name: normal
                    queues:
                        normal: ~
        routing:
            'App\Message\PriorityRequest': priority
            'App\Message\normalRequest': normal

这是我填写队列的方式:

for ($i = 0; $i < 10; $i++) {
    $bus->dispatch(new PriorityRequest($i,'priority'));
    $bus->dispatch(new normalRequest($i,'normal'));
}

这是python + pika中消费者实现的示例:

import pika
import os

def do_work(self,connection,channel,delivery_tag,body):
   print(body)


parameters = pika.URLParameters(os.getenv('MESSENGER_TRANSPORT_DSN'))
connection = pika.BlockingConnection(parameters)

channel = connection.channel()
channel.basic_qos(prefetch_count=1)

channel.queue_declare(queue='priority',durable=True)
channel.queue_declare(queue='normal',durable=True)

channel.basic_consume(queue='priority',on_message_callback=do_work,auto_ack=True)
channel.basic_consume(queue='normal',auto_ack=True)

channel.start_consuming()

如果运行消费者代码,则会得到以下输出

{'id': 0,'data': 'priority'}
{'id': 0,'data': 'normal'}
{'id': 1,'data': 'priority'}
{'id': 1,'data': 'normal'}
{'id': 2,'data': 'priority'}
{'id': 2,'data': 'normal'}
{'id': 3,'data': 'priority'}
{'id': 3,'data': 'normal'}
{'id': 4,'data': 'priority'}
{'id': 4,'data': 'normal'}
{'id': 5,'data': 'priority'}
{'id': 5,'data': 'normal'}
{'id': 6,'data': 'priority'}
{'id': 6,'data': 'normal'}
{'id': 7,'data': 'priority'}
{'id': 7,'data': 'normal'}
{'id': 8,'data': 'priority'}
{'id': 8,'data': 'normal'}
{'id': 9,'data': 'priority'}
{'id': 9,'data': 'normal'}

消息以FIFO顺序处理,我如何强制pika首先处理优先级队列中的消息,并且仅当优先级队列为空时才进入普通队列?

解决方法

开箱即用的Pika不支持此功能。

一个选择是首先从优先级队列中/* styles to override swiper */ .swiper-container { /* change container to overflow visible */ overflow: visible; /* add enough padding to account for the shadow */ padding: 5px; } .my-swipe-wrapper { /* add the overflow to the outer wrapper */ overflow: hidden; /* add negative margin to counteract the padding */ margin: -5px; } 。当队列为空时,取消该使用者,然后从另一个队列中取消basic_consume。完成该工作后,重复并返回到优先级队列。


注意: RabbitMQ团队监视basic_consume mailing list,并且有时仅在StackOverflow上回答问题。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...