为什么在使用eventhubs时在ms azure上会收到此错误?

问题描述

我最近开始使用azure,这是一次压倒性的体验。我开始进行eventhubs的实验,基本上我遵循official tutorials方法来学习如何使用nodejs从eventhub发送和接收消息。

一切正常,因此我构建了一个小型Web应用程序(静态前端应用程序),并将其与节点后端连接在一起,在节点后端与eventhubs进行通信。所以基本上我的应用程序是这样构建的:

frontend <----> node server <-----> eventhubs

如您所见,它非常简单。节点服务器正在从eventhub中获取数据,并将其转发到显示值的前端。这是一次很酷的体验,在出现此错误之前,我一直在享受MS Azure:

 azure.eventhub.common.EventHubError: ErrorCodes.ResourceLimitExceeded: Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5. List of connected receivers - nil,nil,nil.

错误确实令人困惑。我使用认的消费者组,并且只有一个应用程序。我从未尝试从另一个应用程序访问此消费者组。它说限制是5,我只使用一个应用程序,所以应该没问题,还是我错过了什么?我没有检查这里发生了什么。

我浪费了很多时间进行谷歌搜索和研究,但是我没有理解。最后,我认为也许每次我在azure上部署该应用程序(前端和节点服务器)时,这都将被视为一个使用者,并且由于我部署该应用程序超过5次,此错误就会出现。我是对的还是这是胡扯?

我非常感谢您提供有关如何解决错误的帮助。实际上,我想了解的多于解决问题所需要的。我想了解它的含义,因此任何信息都将有所帮助。预先感谢。

编辑

我正在使用websockets作为我的应用程序(前端)和节点服务器(后端)之间的通信协议。节点服务器正在使用认使用者组(我没有做任何更改),我只是遵循this official example from Microsoft。我基本上使用的是MS文档中的代码,这就是为什么我没有从节点服务器发布任何代码段的原因,并且由于错误发生在后端而不是前端,因此如果发布任何前端代码将无济于事。

最后,我使用websocket连接前端和后端。它可以完美地运行一两天,然后开始出现此错误。有时我会打开多个客户端(例如,浏览器中的客户端和智能手机中的客户端)。

我认为我不理解这个消费群体的概念。就像每个客户都是消费者吗?因此,如果我在浏览器的5个不同选项卡中打开我的应用程序(同一个应用程序),那么我有5个消费者吗?


我不太理解下面的答案以及“池客户端”的含义,因此,我将尝试在此处发布代码示例,以向您展示我的意图。

代码段:

服务器

这是我在服务器端使用的功能,用于与eventhub通信并接收/使用消息

async function receiveEventhubMessage(socket,eventHubName,connectionString) {

 
  const consumerClient = new EventHubConsumerClient(consumerGroup,connectionString,eventHubName);


  const subscription = consumerClient.subscribe({
      processEvents: async (events,context) => {

        for (const event of events) {
          console.log("[    consumer    ] Message received : " + event.body);

            io.emit('msg-received',event.body);
        }
      },processError: async (err,context) => {
        console.log(`Error : ${err}`);
      }
    }
  );

如果您注意到,我将eventhub和连接字符串作为参数来进行更改。现在在前端,我有一个包含多个主题的列表,每个主题都有自己的eventhubname,但是它们具有相同的eventhub命名空间。 这是我有两个eventhubname的示例:

{
"EventHubName": "eh-test-command"
"EventHubName": "eh-test-telemetry"
}

如果用户选择发送命令(从前端,我只有一个按钮列表,用户可以单击该按钮以通过websocket触发事件),那么CommandEventHubName将从前端发送到节点服务器。服务器将收到该eventhubname并在我上面发布的函数中切换consumerClient。 这是我称之为的代码

// io is a socket.io object
io.on('connection',socket => {
   socket.on('onUserChoice',choice => {
   // choice is an object sent from the frontend based on what the user choosed. e.g if the user choosed command then choice = {"EventhubName": "eh-test-command","payload": "whatever"}

   receiveEventhubMessage(socket,choice.EventHubName,choice.EventHubNameSpace)
              .catch(err => console.log(`[    consumerClient    ] Error while receiving eventhub messages: ${err}`));
      }
}

希望您能在这里看到我正在尝试做的事情。我正在构建的应用程序将来会扩展到汽车领域的实际用例,这就是为什么这对我很重要。因此,我试图弄清楚如何在eventhubname每次更改时不创建新的ConsumerClient的情况下在eventhub之间切换?

感谢您的帮助,但我必须说,我不理解您在“池客户端”中的示例。您能详细说明一下吗,或者如果您能给我一个最小的例子只是为了让我步入正轨,那将是理想的选择。 我非常感谢。

解决方法

根据问题中的对话,似乎根本原因是您的后端正在为来自前端的每个请求创建一个新的EventHubConsumerClient。由于每个客户端都将打开与该服务的专用连接,因此如果使用相同的使用者组对同一Event Hub实例的请求超过5个,则会超出配额。

要解决此问题,您将考虑考虑合并EventHubConsumerClient实例,以便从每个Event Hub实例开始。您可以通过调用subscribe安全地使用池化客户端来处理您的前端请求。这将使您可以在多个前端请求之间共享连接。

关键思想是并非为每个请求都创建consumerClient,而是在请求之间共享一个实例。用您的代码片段说明最简单的方法,您最终会将客户端创建提升到要接收的函数之外。看起来可能像这样:

const consumerClient = new EventHubConsumerClient(consumerGroup,connectionString,eventHubName);

async function receiveEventhubMessage(socket,eventHubName,connectionString) {
  const subscription = consumerClient.subscribe({
      processEvents: async (events,context) => {

        for (const event of events) {
          console.log("[    consumer    ] Message received : " + event.body);
          io.emit('msg-received',event.body);
        }
      },processError: async (err,context) => {
        console.log(`Error : ${err}`);
      }
    }
  );

也就是说,根据应用程序的架构,以上内容可能不足以满足您的环境。如果为每个请求动态创建了托管receiveEventHubMessage的对象,则没有任何变化。在这种情况下,您可能需要考虑使用单例或依赖注入之类的方法来帮助延长使用寿命。

如果最终遇到了无法满足要求的扩展问题,则可以考虑增加每个事件中心的客户端数量和/或将请求分发给不同的消费者群体。