工人是否可以根据主题消费?

问题描述

我知道您可以使用 channel.sendToQueue 将消息直接发送到队列,这会造成任务和工作人员的情况,其中只有一个消费者将处理每个任务。

我也知道您可以将 channel.publish 与基于主题的交换一起使用,消息将根据路由键路由到队列。不过,据我所知,这将始终广播给任何匹配队列中的所有订阅者。

我本质上想使用基于主题的交换,但只有一个消费者处理每项任务。我已经阅读了文档,但我看不出有什么方法可以做到这一点。

我的用例:

我在多个位置设置了微服务实例。可能有两个在加利福尼亚,三个在伦敦,一个在新加坡,等等。创建任务时,唯一重要的是它由给定位置的一个实例处理。

当然,我可以创建数百个名为“usa-90210”、“uk-ec1a”等的队列。看起来使用主题会更简洁。考虑到可以使用通配符,它​​也会更加灵活。

如果这不是 RabbitMQ 的功能,我也愿意接受其他想法或想法。

更新

根据 istepaniuk 的建议,我尝试创建两个工作人员,每个工作人员将自己的队列绑定到交换:

const connectionA = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const channelA = await connectionA.createChannel();
channelA.assertExchange('test_exchange','topic',{ durable: false });
await channelA.assertQueue('test_queue_a',{ exclusive: true });
channelA.bindQueue('test_queue_a','test_exchange','usa.*');
await channelA.consume('test_queue_a',() => { console.log('worker a'); },{ noAck: true });

const connectionB = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const channelB = await connectionB.createChannel();
channelB.assertExchange('test_exchange',{ durable: false });
await channelB.assertQueue('test_queue_b',{ exclusive: true });
channelB.bindQueue('test_queue_b','usa.*');
await channelB.consume('test_queue_b',() => { console.log('worker b'); },{ noAck: true });

const pubConnection = await amqplib.connect('amqp://guest:guest@127.0.0.1:5672');
const pubChannel = await pubConnection.createChannel();
pubChannel.assertExchange('test_exchange',{ durable: false });
pubChannel.publish('test_exchange','usa.90210',Buffer.from(''));

不幸的是,两个消费者仍然收到消息。

worker a
worker b

解决方法

是的。

我本质上想使用基于主题的交换,但只有一个消费者处理每项任务。我已经阅读了文档,但我看不出有什么方法可以做到这一点。

使用主题交换并让您的消费者声明并绑定他们自己的队列。你描述的是一个很常见的场景。它在 tutorial 5,"Topics" 中进行了概述。

此外,您可以让多个消费者共享一个队列(只是不要将其声明为 .useredit,.fas.fa-times,#save { display:none; })。这在 tutorial 2,"Workers" 中有描述。

消费者的多个实例可以声明相同的队列和绑定,操作是幂等的。使用 <link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/5.15.2/css/all.min.css" rel="stylesheet"/> <link href="https://cdn.jsdelivr.net/npm/bootstrap@4.5.3/dist/css/bootstrap.min.css" rel="stylesheet"/> <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script> <table class="table usertable"> <thead> </thead> <form class="" action="{{ route('user_edit') }}" method="POST"> <tbody> @csrf <tr> <th scope="col">Name</th> <td class="userdata">Name</td> <td class="useredit"><input name="name" placeholder="Write your name"> <td> <td><a href="#"><i class="fas fa-edit"></i><i class="fas fa-times"></i></a> <td> </tr> <tr> <th scope="col">E-Mail</th> <td class="userdata">email@email.com</td> <td class="useredit"><input name="email" placeholder="Write your Email"> <td> <td><a href="#"><i class="fas fa-edit"></i><i class="fas fa-times"></i></a> <td> </tr> <tr> <th scope="col">Phone</th> <td class="userdata">555555555</td> <td class="useredit"><input name="phone" placeholder=" Enter your phone number"> <td> <td><a href="#"><i class="fas fa-edit"></i><i class="fas fa-times"></i></a> <td> </tr> <tr> <th></th> <td><input id='save' type="submit" value="Save"></td> </tr> </tbody> </form> </table> 队列(而不是 exclusive)还意味着如果您的所有消费者消失或网络出现故障,消息将排队。