问题描述
我正在将RabbitMQ服务器(v3.8.9)与Java客户端一起使用。 用例是: 我们的后端为不同的客户创建消息。我们将它们发送到各自的端点。
- 1个生产者->出站队列-> 1个消费者
- 生产者为n个客户创建消息
- 哪个消费者应该发送到客户的端点
- 关于每个客户的消息必须保持正确的顺序
工作正常,除非所有客户端都已启动并正在运行。问题:如果一个客户端不可用,我们需要为此建立防弹重试机制。 说:
- 等待1分钟,然后重试
- 以下所有消息均不得在第一个失败之前传递,并保持正确顺序
- 如果重试有效,则所有其他消息应立即发送给客户端
如您所见,这不是仅“暂停”使用者的解决方案,因为它仍应将msg传递给其他(活动的)客户端。由于应用程序限制和动态的客户端数量,我们无法为每个客户端队列产生一个使用者。 目前,我最好的方法是为每个客户端动态创建一个队列,然后将其路由到单个出站队列。如果使用者无法向一个客户端传递一个消息,那么我想“暂停”客户端队列x分钟。像“ queue_pause('client_q1','5 Minutes')”之类的API调用会有所帮助。但是即使那样,我也必须处理其他已经路由到该特定客户端的消息,并保持它们的正确顺序... 还有更好的主意吗?
解决方法
我认为这里的关键是单个使用者脚本可以从多个队列中使用。因此,如果我理解正确,则可以将其建模为:
- 每个客户端都有自己的队列。这些可以由使用者脚本启动时创建,也可以由创建新客户端时的后端进程创建。
- 使用者脚本分别订阅每个队列
- 收到消息后,消费者尝试立即将其发送给客户端;如果成功,则将manually acknowledged与
basic.ack
一起使用,并且消费者准备向该客户端发送下一条消息 。 - 当无法将邮件传递给客户端时,将其重新排队(
basic.nack
或basic.reject
和requeue=1
),并保留其在客户端队列中的位置。 - 然后,使用者需要从该特定队列中暂停消费。根据其编写方式,在该特定线程中可以像
sleep
一样简单,但是如果不切实际,则可以有效地将 subscription “暂停”到队列中:- 取消对该队列的订阅,其他订阅保持不变
- 将队列名称和重试时间存储在适当的变量中
- 如果使用者脚本是通过事件/轮询循环实现的,则每次在该循环附近检查“已暂停”订阅的列表;如果已达到重试时间,请重新订阅。
- 或者,如果库/框架支持它,则注册一个延迟的事件,该事件将在适当的时间触发并重新预订该队列。具体的机制取决于您使用的技术。
- 所有其他订阅将继续,因此将向其他客户端发送邮件。没有订阅者的队列将按顺序保留脱机客户端的消息,直到使用者脚本开始再次使用它们为止。