重新连接时使用 node-amqplib 取消订阅特定队列

问题描述

问题:远程系统重新连接到多个节点的 websocket 服务器,为每个系统创建/使用 RabbitMQ 中的专用队列。如果不存在活动连接,则应自动删除队列。 Websocket连接/断开事件处理程序是异步的,相当重,观察到的问题是重新连接后断开连接事件处理程序完成,导致系统不一致。

主要问题在于 RabbitMQ 队列 - 最初的解决方案是为每个连接创建唯一的队列并在断开连接时将其删除。看起来很重。

第二种方法是为每个远程系统保留一个专用队列(任何连接的队列名称都相同),问题是 assertQueue 为同一队列添加了消费者。需要找到删除陈旧队列消费者而不删除队列本身的方法

解决方法

解决方案是存储每个远程系统的消费者列表,并在断开连接事件触发取消功能时使用最旧的消费者标签,然后更新给定远程系统的队列消费者列表。

远程系统连接事件

import { Replies } from "amqplib";

// bind callback function for queue-specific messages and store returned consumer description
const result: Replies.Consume = await channel.consume(queueName,this.onSomeMessage.bind(this)); 

// update consumers list for the connected remote system
const consumers: Array<string> | undefined = this.consumers.get(remoteId);
if (consumers === undefined) {
  const consumersList: Array<string> = new Array();
  consumersList.push(result.consumerTag);
  this.consumers.set(remoteId,consumersList);
} else {
  consumers.push(result.consumerTag);
}

关于远程系统断开事件

// remove the oldest consumer in the list and update the list itself
// use cancel method of the amqp channel
const consumers = this.consumers.get(remoteId);
if (consumers === undefined) {  
  // shouldn't happen
  console.error(`consumers list for ${remoteId} is empty`);
} else {
  const consumerTag = consumers[0];
  await this.rxchannel.addSetup(async (channel: ConfirmChannel) => {
    await channel.cancel(consumerTag);
    consumers.shift();
  });
}

代码片段来自某个类的方法实现(如果您想知道“这个”)。

版权声明(特别是对于德国同事):这个答案中的代码可以在 Beerware (https://en.wikipedia.org/wiki/Beerware) 或 MIT 许可证(任何人喜欢的)下使用。