BullMQ - 服务器重启之间的持久性作业排队和管理作业

问题描述

任务的小背景- 我正在为 Shopify 构建废弃的购物车恢复系统。用户结帐后,Shopify 调用我们的 webhook,webhook 将该请求作为作业排队,延迟 5 分钟在队列 A 中。当工作人员开始处理该作业时,它会检查该结帐是否已付款。如果尚未付款,则会向用户发送购物车恢复消息。

我正在使用 Node.js、Express.js、Redis 和 BullMQ 来实现服务器和排队系统。 我已经尝试过 BUllMQ 的基本示例。在网络上,找不到有关如何在生产级系统中使用它的一些高级示例。

现在,我被以下问题困住了 -

  1. 由于 Redis 是内存数据库,我必须将每个传入的作业保存在我的 MongoDB 集合中,最初状态为 PENDING 并侦听完成事件以将该状态更改为数据库中的 COMPLETED。每当我的服务器重新启动时,我都会获取所有处于 PENDING 状态的作业并将它们添加到队列中。这样我们就可以恢复我们的工作,即使 Redis 宕机或重启。我的问题是—— 在生产级应用程序中做这样的事情有意义吗?我应该在 MongoDB 中保存工作吗?以及在实施此流程时我应该采取哪些其他注意事项?

  2. 现在,Bull 队列(我们将其命名为“A”)正在快速服务器中初始化。每次服务器重新启动时,队列 A 也会被初始化。我的问题是—— 是否重新初始化队列 A,删除 Redis 上的旧队列 A?我还能做些什么来解决这个问题?

我将非常感谢您对此的任何帮助。

解决方法

让我回答你的问题:

由于 Redis 是内存数据库,我必须将每个传入的作业保存在我的 MongoDB 集合中,最初状态为 PENDING 并侦听完成事件以将该状态更改为数据库中的 COMPLETED。每当我的服务器重新启动时,我都会获取所有处于 PENDING 状态的作业并将它们添加到队列中。这样我们就可以恢复我们的工作,即使 Redis 宕机或重启。我的问题是 - 在生产级应用程序中做这样的事情有意义吗?,我应该在 MongoDB 中保存工作吗?以及在实施此流程时我还应该采取哪些其他注意事项?

尽管 Redis 是内存数据库,但您可以在大多数云提供商中启用 persistence,或者按照 AWS Elasticache 中的标准添加 replication,这将提供非常可靠的系统。当然,您应该像使用任何数据库一样为 Redis 持久化数据安排备份,所有托管的 Redis 提供商都会这样做。 重要的是在您的 Redis 实例内存不足时保持警报,因为这将使您的队列停止工作。您可以启用 "removeOnComplete" 以避免堆积您不再关心的作业。

现在,Bull 队列(让我们命名为“A”)正在快速服务器中初始化。每次服务器重新启动时,队列 A 也会被初始化。我的问题是 - 重新初始化队列 A 是否会删除 Redis 上的旧队列 A?我还能做些什么来解决这个问题?

不,Bull/BullMQ 旨在让您可以根据需要经常实例化相同的 Queue,并且不会丢失任何数据。事实上,这就是您可以通过为给定队列实例化任意数量的工作人员来扩展工作人员的方式。