在 RabbitMq 连接上检测异常

问题描述

使用 spring 集成将消息从 RabbitMQ 传输到 MQ 效果很好。

如果我停止 RabbitMQ 服务器,那么日志文件就会出错:

ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320,reply-text=CONNECTION_FORCED - Node was put into maintenance mode,class-id=0,method-id=0)

我们如何拦截这个异常?

这在 jms ExceptionListener添加 DefaultMessageListenerContainer效果很好

如下配置beans:

<bean id="connectionAmqpFactorySrc" class="com.rabbitmq.client.ConnectionFactory">
    <property name="automaticRecoveryEnabled" value="true"/>
    <property name="networkRecoveryInterval" value="10000"/>
</bean>

<rabbit:connection-factory  id="rabbitConnectionFactory" connection-factory="connectionAmqpFactorySrc"
    username="guest" 
    password="guest" 
    addresses="XX.XX.XX.XX"
    cache-mode="CONNECTION" 
    virtual-host="/"  
    shuffle-addresses="true" />


<bean id="fixedBackOffRabbitMQ" class="org.springframework.util.backoff.FixedBackOff">
    <constructor-arg index="0" value="10000" />
    <constructor-arg index="1" value="3" />
</bean>
    
<bean id="myListener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
    <property name="queueNames" value="MyQueue" />
    <property name="recoveryBackOff" ref="fixedBackOffRabbitMQ"/>
    <property name="channelTransacted" value="true"></property>
    <property name="errorHandler" ref="errorHandler"></property>
</bean>
    
<int-amqp:inbound-channel-adapter   channel="channelRmqMQ" 
        id="inboundChannelAdapter" 
        auto-startup="true" listener-container="myListener" error-channel="processChannel1" />

EDIT1

正如你告诉我的,我使用这样的 bean 的定义:

<bean id="listeners" class="java.util.ArrayList">
    <constructor-arg>
        <list>
            <ref bean="connectionAmqpListener" />
        </list>
    </constructor-arg>
</bean>

<bean id="rabbitConnectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="connectionAmqpFactorySrc"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="addresses" value="XX.XX.XX.XX"/>
    <property name="cacheMode" value="CONNECTION"/>
    <property name="virtualHost" value="/"/>
    <property name="shuffleAddresses" value="true"/>
    <property name="connectionListeners" ref="listeners"/>
</bean>

使用 ConnectionAmqpListener.java

public class ConnectionAmqpListener implements ConnectionListener {
    
    private final Log LOG = LogFactory.getLog(ConnectionAmqpListener.class);
    
    public ConnectionAmqpListener() {
        super();
    }
    
    public void onCreate(Connection connection) {
        System.out.println("Open connection");
    }
    
    public void onClose(Connection connection) {
        System.out.println("Connection is closed");
    }
    
    public void onShutDown(ShutdownSignalException signal) {
        System.out.println("Connection is shutdown");
        System.exit(-1);
    }
}   

这很好用,当我停止代理时,方法 onShutDown调用

但是如果我重新启动我的进程(代理关闭),我的日志文件中没有任何消息并且进程停止。

如果连接失败,您有什么关于如何获取信息的建议吗?

结束编辑1

感谢您的帮助

问候,

埃里克

解决方法

ConnectionFactory

void addConnectionListener(ConnectionListener listener)

那个回调有这个钩子:

/**
 * Called when a connection is force closed.
 * @param signal the shut down signal.
 * @since 2.0
 */
default void onShutDown(ShutdownSignalException signal) {
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...