spring-amqp:使用NACKS RECEIVED消息关闭通道

问题描述

通过在RabbitMQ总线上使用重负载的spring-amqp,有时我们会从org.springframework.amqp.rabbit.connection.CachingConnectionFactory中获取日志: 频道关闭:清除频道关闭;协议方法:#method 回复代码= 200,回复文本=已接收NACK,类别ID = 0,方法ID = 0)

请您解释一下该日志,为什么它处于错误级别? 我们需要进行任何调整吗? 预先感谢您的回答。

解决方法

如果所有发布者确认都未随超时返回,则通道引发异常...

@Override
public void waitForConfirmsOrDie(long timeout)
    throws IOException,InterruptedException,TimeoutException
{
    try {
        if (!waitForConfirms(timeout)) {
            close(AMQP.REPLY_SUCCESS,"NACKS RECEIVED",true,null,false);
            throw new IOException("nacks received");
        }
    } catch (TimeoutException e) {
        close(AMQP.PRECONDITION_FAILED,"TIMEOUT WAITING FOR ACK");
        throw(e);
    }
}

如果回复文本为DefaultChannelCloseLogger,则OK仅跳过正常关闭(200)...

/**
 * Return true if the {@link ShutdownSignalException} reason is AMQP.Channel.Close and
 * the reply code was AMQP.REPLY_SUCCESS (200) and the text equals "OK".
 * @param sig the exception.
 * @return true for a normal channel close.
 */
public static boolean isNormalChannelClose(ShutdownSignalException sig) {
    Method shutdownReason = sig.getReason();
    return isNormalShutdown(sig) ||
            (shutdownReason instanceof AMQP.Channel.Close
                && AMQP.REPLY_SUCCESS == ((AMQP.Channel.Close) shutdownReason).getReplyCode()
                && "OK".equals(((AMQP.Channel.Close) shutdownReason).getReplyText()));
}

如果要忽略这些错误,可以配置自定义关闭异常记录器:

/**
 * Set the strategy for logging close exceptions; by default,if a channel is closed due to a failed
 * passive queue declaration,it is logged at debug level. Normal channel closes (200 OK) are not
 * logged. All others are logged at ERROR level (unless access is refused due to an exclusive consumer
 * condition,in which case,it is logged at INFO level).
 * @param closeExceptionLogger the {@link ConditionalExceptionLogger}.
 * @since 1.5
 */
public void setCloseExceptionLogger(ConditionalExceptionLogger closeExceptionLogger) {
    Assert.notNull(closeExceptionLogger,"'closeExceptionLogger' cannot be null");
    this.closeExceptionLogger = closeExceptionLogger;
    if (this.publisherConnectionFactory != null) {
        this.publisherConnectionFactory.setCloseExceptionLogger(closeExceptionLogger);
    }
}