RabbitMQ:如何将失败的消息从一个队列移到另一个队列?

问题描述

我有两个队列:

enter image description here

当我运行rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate -u admin -p admin时,可以看到相同的内容

我得到:

+-------+-------------------------+-------------------------+----------+------------------------------------+
| vhost |          name           |          node           | messages | message_stats.publish_details.rate |
+-------+-------------------------+-------------------------+----------+------------------------------------+
| /     | high_priority           | rabbit@server-rabbitmq  | 5        | 0.0                                |
| /     | high_priority_secondary | rabbit@server-rabbitmq  | 0        | 0.0                                |
+-------+-------------------------+-------------------------+----------+------------------------------------+

我的交流(rabbitmqadmin -V / list exchanges -u admin -p admin)如下:

+-------------------------+---------+
|          name           |  type   |
+-------------------------+---------+
|                         | direct  |
| amq.direct              | direct  |
| amq.fanout              | fanout  |
| amq.headers             | headers |
| amq.match               | headers |
| amq.rabbitmq.trace      | topic   |
| amq.topic               | topic   |
| high_priority           | direct  |
| high_priority_secondary | direct  |
| low_priority            | direct  |
+-------------------------+---------+

队列和整个相关逻辑在PHP / Symfony中实现,无论如何,我想使用rabbitmqadminrabbitmqctl命令使用本地逻辑(如果可能)在终端中。

如果high_priority上的消息失败,我希望RabbitMQ将其自动移动到high_priority_secondary队列中而无需任何PHP。这可能吗?我已经开始阅读有关Dead Letter Exchanges内容,但不确定如何处理。

我已经为辅助队列创建了使用者,因此消息一旦移到那里,便会得到处理。

是否只能在CLI中实现?

FYI:SO上有一些建议的帖子已经涵盖了这个问题,但是没有一个解决方案完全是CLI的。

解决方法

high_priority_secondary队列应绑定到high_priority_secondary交换。 high_priority队列应绑定到high_priority交换,并应使用x-dead-letter-exchange = high_priority_secondary声明。

因此,应该使用无效信件交换声明队列。

要对此进行测试,当您从high_priority队列中使用带有重新排队的消息时,只需拒绝该消息即可。 enter image description here

,

好的,虽然我不必修改任何PHP代码,但是我确实必须在框架级别上更改yaml配置,因为我希望我的解决方案能够持久存在并成为代码库的一部分。

在您的app/config/services/rabbitmq.yaml中:

定义生产者:

high_priority:
    connection: default
    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
    exchange_options:
        name: 'high_priority'
        type: direct
high_priority_secondary:
    connection: default
    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
    exchange_options:
        name: 'high_priority_secondary'
        type: direct
message_hospital:
    connection: default
    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer
    exchange_options:
        name: 'message_hospital'
        type: direct

定义消费者:

high_priority:
    connection: default
    exchange_options:
        name: 'high_priority'
        type: direct
    queue_options:
        name: 'high_priority'
        arguments:
            x-dead-letter-exchange: ['S','high_priority_secondary']
    qos_options:
        prefetch_size: 0
        prefetch_count: 1
        global: false
    callback: foo.task_bus.consumer
high_priority_secondary:
    connection: default
    exchange_options:
        name: 'high_priority_secondary'
        type: direct
    queue_options:
        name: 'high_priority_secondary'
        arguments:
            x-dead-letter-exchange: ['S','message_hospital']
    qos_options:
        prefetch_size: 0
        prefetch_count: 1
        global: false
    callback: foo.task_bus.consumer
message_hospital:
    connection: default
    exchange_options:
        name: 'message_hospital'
        type: direct
    queue_options:
        name: 'message_hospital'
    qos_options:
        prefetch_size: 0
        prefetch_count: 1
        global: false
    callback: foo.task_bus.consumer

现在队列如下:

enter image description here

由于DLX属性,消息在先前的消息中失败后便立即进入医院队列。