RabbitMQ:确认/确认关闭并重新打开的通道上的消息

问题描述

我从 RabbitMq 服务器收到此错误

通道被服务器关闭:406 (PRECONDITION-Failed) 带有消息“PRECONDITION_Failed - 未知的交付标签 80”

发生这种情况是因为在消费者任务期间连接丢失,最后,当消息被确认/确认时,我收到此错误,因为我无法在与我从中获得消息的频道不同的频道上确认消息。

这是RabbitMq连接的代码

async connect({ prefetch = 1,queueName }) {
    this.queueName = queueName;
    console.log(`[AMQP][${this.queueName}] | connecting`);
    return queue
        .connect(this.config.rabbitmq.connstring)
        .then(conn => {
            conn.once('error',err => {
                this.channel = null;
                if (err.message !== 'Connection closing') {
                    console.error(
                        `[AMQP][${this.queueName}] (evt:error) | ${err.message}`,);
                }
            });

            conn.once('close',() => {
                this.channel = null;
                console.error(
                    `[AMQP][${this.queueName}] (evt:close) | reconnecting`,);
                this.connect({ prefetch,queueName: this.queueName });
            });
            return conn.createChannel();
        })
        .then(ch => {
            console.log(`[AMQP-channel][${this.queueName}] created`);
            ch.on('error',err => {
                console.error(
                    `[AMQP-ch][${this.queueName}] (evt:error) | ${err.message}`,);
            });
            ch.on('close',() => {
                console.error(`[AMQP-ch][${this.queueName}] (evt:close)`);
            });
            this.channel = ch;
            return this.channel;
        })
        .then(ch => {
            return this.channel.prefetch(prefetch);
        })
        .then(ch => {
            return this.channel.assertQueue(this.queueName);
        })
        .then(async ch => {
            while (this.buffer.length > 0) {
                const request = this.buffer.pop();
                await request();
            }
            return this.channel;
        })
        .catch(error => {
            console.error(error);
            console.log(`[AMQP][${this.queueName}] reconnecting in 1s`);
            return this._delay(1000).then(() =>
                this.connect({ prefetch,queueName: this.queueName }),);
        });
}

async ack(msg) {
    try {
        if (this.channel) {
            console.log(`[AMQP][${this.queueName}] ack`);
            await this.channel.ack(msg);
        } else {
            console.log(`[AMQP][${this.queueName}] ack (buffer)`);
            this.buffer.push(() => {
                this.ack(msg);
            });
        }
    } catch (e) {
        console.error(`[AMQ][${this.queueName}] ack error: ${e.message}`);
    }
}

如您所见,在建立连接后创建了一个通道,在我遇到连接问题后,该通道被设置为 NULL,1 秒后连接重试,重新创建一个新通道。

为了管理离线时段,我使用了一个缓冲区来收集在通道为 NULL 时发送的所有 ack 消息,并且在连接重新建立后,我卸载了缓冲区。

所以基本上我必须找到一种方法,在连接丢失或通道因天气原因关闭后发送 ACK。

感谢您的帮助

解决方法

如果连接由于某种原因被丢弃或中断,则无法发送 ACK,因为连接发生在套接字级别,一旦关闭,就无法使用相同的套接字重新创建它。

当连接断开时,消息保持非 ACK,因此另一个侦听器可以处理它,或者当它再次连接时断开连接的侦听器将再次处理它。

在我看来,您正在尝试解决的问题不是由 RabbitMQ 提供的,而是由基础的套接字实现提供的。

您可以通过避免管理消息缓冲区并利用 RabbitMQ 的特性来解决此问题,该特性将在您的侦听器再次连接时重新呈现最后一条未处理的消息。

,

一旦通道关闭(无论是什么原因),您将无法确认消息。代理会自动将相同的消息重新传递给另一个消费者。

这在 RabbitMQ message confirmation 部分有详细记录。

当消费者失败或失去连接时:自动重新排队

当使用手动确认时,当发生传递的通道(或连接)关闭时,任何未被确认的传递(消息)都会自动重新排队。这包括客户端的 TCP 连接丢失、消费者应用(进程)故障和通道级协议异常(如下所述)。

...

由于这种行为,消费者必须准备好处理重新交付,否则在实现时要考虑到幂等性。 Redeliveries 将有一个特殊的布尔属性 redeliver,由 RabbitMQ 设置为 true。对于第一次交付,它将被设置为 false。请注意,消费者可以接收之前传递给另一个消费者的消息。

正如文档所建议的,您需要通过实现消息幂等性设计模式在消费者端处理此类问题。换句话说,您的架构应该准备好处理由于错误而导致的消息重新传递。

或者,您可以禁用消息确认并获得“一次传递”类型的模式。这意味着如果出现错误,您将不得不处理消息丢失。

关于此事的进一步解读: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

Kafka 引入新语义后的后续工作: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/