如何使用消息队列在 Node JS 中执行长事件处理?

问题描述

我正在 Node JS 中构建电子邮件处理管道,并将 Google Pub/Sub 作为消息队列。消息队列有一个限制,它需要在 10 分钟内确认发送的消息。但是,它发送到 Node JS 服务器的作业可能需要一个小时才能完成。因此,同一个作业可能会运行多次,直到其中一个完成。我担心这会阻塞 Node JS 事件循环并降低服务器速度。

找到附加的架构图。我的问题是:

  1. 鉴于消息队列期望在 10 分钟内得到响应,我是否应该使用消息队列来启动这项长时间运行的作业,还是应该考虑其他一些架构?
  2. 如果启动了多个此类作业,我是否应该担心 Node JS 事件循环被阻塞。每个作业基本上都在遍历 MongoDB 游标,从而创建数十万封电子邮件

architecture diagram

解决方法

好吧,听起来您要么不应该使用该队列(超时时间无法更改),要么您应该将您的工作分解为在超时之前很容易完成的工作。这听起来像是您只需要将工具与工作要求相匹配的情况。如果该队列不符合您的要求,您可能需要不同的机制。我不完全了解您需要从 Google 的 pub/sub 中获得什么,但是如果您只想序列化对一堆作业的访问,那么创建自己的队列或在 NPM 上查找通用队列通常相当容易。

只要您的所有 I/O 都使用异步方法,我相当怀疑您是否存在 nodejs 事件循环阻塞问题。您所做的任何事情听起来都不会占用大量 CPU,这就是阻止事件循环(长时间运行占用大量 CPU 的操作)的原因。您的整个项目可能同时受到 MongoDB 和您用于发送电子邮件的任何内容的限制,因此您应该确保您不会让其中任何一个不堪重负,以至于它们变得缓慢并失去吞吐量。

,

回答原始问题:

  1. 鉴于消息队列期望在 10 分钟内得到响应,我是否应该使用消息队列来启动这个长时间运行的作业? 我应该考虑其他一些架构吗?

是的,消息队列非常适合处理这些类型的事件。重要的是要确保最终的动作是幂等的,这样即使您不小心处理了重复的事件,最终结果也会应用一次。这个来自 Google Cloud 的 guide 是使您的订阅者幂等的有用资源。

为了解决 Pub/Sub 的 10 分钟限制,我最终创建了一个内存表来跟踪活动作业。如果正在积极处理作业并且 Pub/Sub 再次发送消息,则它不会执行任何操作。如果服务器重启并丢失作业,内存表也会消失,因此如果作业不完整,可以再次处理。

  1. 如果启动了多个此类作业,我是否应该担心 Node JS 事件循环被阻塞。每个工作基本上都是通过一个 MongoDB 游标创建了数十万封电子邮件。 根据 jfriend00 留下的评论,我暂时忽略了这一点。您还可以对正在处理的作业数量进行速率限制。