问题描述
我有2个通过Rabbitmq进行通信的微服务,我需要实现优先级消息。
第一个微服务充当发布者,以symfony + Messenger(amqp传输)编写。
第二个微服务充当消费者,用python + pika编写。
Messenger文档(https://symfony.com/doc/current/messenger.html#prioritized-transports)建议针对不同的消息优先级使用单独的队列,该组件无法使用Rabbitmq的内置功能来对消息进行优先级排序。实际上,发布者没有任何问题,我对其进行了配置,以使必要的消息进入优先级队列。
消费者遇到了问题,我无法让pika首先读取优先级队列,然后再读取常规队列。
这是我的Messenger组件配置的一个示例:
framework:
messenger:
transports:
priority:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: priority
queues:
priority: ~
normal:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: normal
queues:
normal: ~
routing:
'App\Message\PriorityRequest': priority
'App\Message\normalRequest': normal
这是我填写队列的方式:
for ($i = 0; $i < 10; $i++) {
$bus->dispatch(new PriorityRequest($i,'priority'));
$bus->dispatch(new normalRequest($i,'normal'));
}
这是python + pika中消费者实现的示例:
import pika
import os
def do_work(self,connection,channel,delivery_tag,body):
print(body)
parameters = pika.URLParameters(os.getenv('MESSENGER_TRANSPORT_DSN'))
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='priority',durable=True)
channel.queue_declare(queue='normal',durable=True)
channel.basic_consume(queue='priority',on_message_callback=do_work,auto_ack=True)
channel.basic_consume(queue='normal',auto_ack=True)
channel.start_consuming()
{'id': 0,'data': 'priority'}
{'id': 0,'data': 'normal'}
{'id': 1,'data': 'priority'}
{'id': 1,'data': 'normal'}
{'id': 2,'data': 'priority'}
{'id': 2,'data': 'normal'}
{'id': 3,'data': 'priority'}
{'id': 3,'data': 'normal'}
{'id': 4,'data': 'priority'}
{'id': 4,'data': 'normal'}
{'id': 5,'data': 'priority'}
{'id': 5,'data': 'normal'}
{'id': 6,'data': 'priority'}
{'id': 6,'data': 'normal'}
{'id': 7,'data': 'priority'}
{'id': 7,'data': 'normal'}
{'id': 8,'data': 'priority'}
{'id': 8,'data': 'normal'}
{'id': 9,'data': 'priority'}
{'id': 9,'data': 'normal'}
消息以FIFO顺序处理,我如何强制pika首先处理优先级队列中的消息,并且仅当优先级队列为空时才进入普通队列?
解决方法
开箱即用的Pika不支持此功能。
一个选择是首先从优先级队列中/* styles to override swiper */
.swiper-container {
/* change container to overflow visible */
overflow: visible;
/* add enough padding to account for the shadow */
padding: 5px;
}
.my-swipe-wrapper {
/* add the overflow to the outer wrapper */
overflow: hidden;
/* add negative margin to counteract the padding */
margin: -5px;
}
。当队列为空时,取消该使用者,然后从另一个队列中取消basic_consume
。完成该工作后,重复并返回到优先级队列。
注意: RabbitMQ团队监视basic_consume
mailing list,并且有时仅在StackOverflow上回答问题。