如何使用 ConnectionListner 和/或 ChannelListner 记录 RabbitMQ 中消息传递的失败/成功

问题描述

我正在尝试记录在 RabbitMQ 中发送消息期间发生的任何信息或异常,为此我尝试在现有连接工厂中添加 ConnectionListener。

    kRabbitTemplate.getConnectionFactory().addConnectionListener(new ConnectionListener() {

        @Override
        public void onCreate(Connection connection) {
            System.out.println("Connection Created");
        }

        @Override
        public void onShutDown(ShutdownSignalException signal) {
            System.out.println("Connection Shutdown "+signal.getMessage());
        }

    });
    kRabbitTemplate.convertAndSend(exchange,routingkey,empDTO);       
    

为了测试异常场景,我从RabbitMQ控制台解绑甚至删除了队列。但我没有收到任何异常或任何关闭方法调用

虽然,当我停止 RabbitMQ 服务时,我得到了

Exception in thread "Thread-5" org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

但是这个异常不是来自我添加的监听器。

我想知道

  1. 为什么我没有从关闭方法中得到任何异常或调用
  2. 如何使用 ConnectionListner 和/或 ChannelListner 来记录消息传递的失败/成功。
  3. 我们可以使用 AMQP 附加程序吗?如果可以,我们该怎么做? (任何示例/教程)
  4. 确保消息发送的其他方法是什么?

注意:我不想使用发布者确认的方法

解决方法

Connection Refused 不是 ShutdownSignalException - 从未建立连接,因为服务器/端口上不存在代理。

您不能使用侦听器来确认单个消息的发送或返回;使用发布者确认并返回。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#publishing-is-async

有关如何使用附加程序的信息,请参阅文档。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#logging

编辑

要获得连接失败的通知,您目前需要使用其他技术,具体取决于您是发送还是接收。

下面是一个示例:

@SpringBootApplication
public class So66882099Application {

    private static final Logger log = LoggerFactory.getLogger(So66882099Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So66882099Application.class,args);
    }

    @RabbitListener(queues = "foo")
    void listen(String in) {

    }

    // consumer side listeners for no connection

    @EventListener
    void consumerFailed(ListenerContainerConsumerFailedEvent event) {
        log.error(event + " via event listener");
        if (event.getThrowable() instanceof AmqpConnectException) {
            log.error("Broker down?");
        }
    }

    // or

    @Bean
    ApplicationListener<ListenerContainerConsumerFailedEvent> eventListener() {
        return event -> log.error(event + " via application listener");
    }

    // producer side - use a RetryListener

    @Bean
    RabbitTemplate template(ConnectionFactory cf) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
        RetryTemplate retry = new RetryTemplate();
        // configure retries here as needed
        retry.registerListener(new RetryListener() {

            @Override
            public <T,E extends Throwable> boolean open(RetryContext context,RetryCallback<T,E> callback) {
                return true;
            }

            @Override
            public <T,E extends Throwable> void onError(RetryContext context,E> callback,Throwable throwable) {

                log.error("Send failed " + throwable.getMessage());
            }

            @Override
            public <T,E extends Throwable> void close(RetryContext context,Throwable throwable) {
            }

        });
        rabbitTemplate.setRetryTemplate(retry);
        return rabbitTemplate;
    }


    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            try {
                template.convertAndSend("foo","bar");
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        };
    }

}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...