问题描述
我有两个队列:
当我运行rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate -u admin -p admin
时,可以看到相同的内容:
我得到:
+-------+-------------------------+-------------------------+----------+------------------------------------+
| vhost | name | node | messages | message_stats.publish_details.rate |
+-------+-------------------------+-------------------------+----------+------------------------------------+
| / | high_priority | rabbit@server-rabbitmq | 5 | 0.0 |
| / | high_priority_secondary | rabbit@server-rabbitmq | 0 | 0.0 |
+-------+-------------------------+-------------------------+----------+------------------------------------+
我的交流(rabbitmqadmin -V / list exchanges -u admin -p admin
)如下:
+-------------------------+---------+
| name | type |
+-------------------------+---------+
| | direct |
| amq.direct | direct |
| amq.fanout | fanout |
| amq.headers | headers |
| amq.match | headers |
| amq.rabbitmq.trace | topic |
| amq.topic | topic |
| high_priority | direct |
| high_priority_secondary | direct |
| low_priority | direct |
+-------------------------+---------+
队列和整个相关逻辑在PHP / Symfony中实现,无论如何,我想使用rabbitmqadmin
或rabbitmqctl
命令使用本地逻辑(如果可能)在终端中。
如果high_priority
上的消息失败,我希望RabbitMQ将其自动移动到high_priority_secondary
队列中而无需任何PHP。这可能吗?我已经开始阅读有关Dead Letter Exchanges的内容,但不确定如何处理。
我已经为辅助队列创建了使用者,因此消息一旦移到那里,便会得到处理。
是否只能在CLI中实现?
FYI:SO上有一些建议的帖子已经涵盖了这个问题,但是没有一个解决方案完全是CLI的。
解决方法
high_priority_secondary
队列应绑定到high_priority_secondary
交换。
high_priority
队列应绑定到high_priority
交换,并应使用x-dead-letter-exchange = high_priority_secondary
声明。
因此,应该使用无效信件交换声明队列。
要对此进行测试,当您从high_priority
队列中使用带有重新排队的消息时,只需拒绝该消息即可。
好的,虽然我不必修改任何PHP代码,但是我确实必须在框架级别上更改yaml
配置,因为我希望我的解决方案能够持久存在并成为代码库的一部分。
在您的app/config/services/rabbitmq.yaml
中:
定义生产者:
high_priority:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'high_priority'
type: direct
high_priority_secondary:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'high_priority_secondary'
type: direct
message_hospital:
connection: default
class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
exchange_options:
name: 'message_hospital'
type: direct
定义消费者:
high_priority:
connection: default
exchange_options:
name: 'high_priority'
type: direct
queue_options:
name: 'high_priority'
arguments:
x-dead-letter-exchange: ['S','high_priority_secondary']
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
high_priority_secondary:
connection: default
exchange_options:
name: 'high_priority_secondary'
type: direct
queue_options:
name: 'high_priority_secondary'
arguments:
x-dead-letter-exchange: ['S','message_hospital']
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
message_hospital:
connection: default
exchange_options:
name: 'message_hospital'
type: direct
queue_options:
name: 'message_hospital'
qos_options:
prefetch_size: 0
prefetch_count: 1
global: false
callback: foo.task_bus.consumer
现在队列如下:
由于DLX属性,消息在先前的消息中失败后便立即进入医院队列。