JMS消息侦听器调用程序无法强制转换为org.apache.qpid.client.AMQDestination

问题描述

我有一个如下的配置文件

@Slf4j
@Configuration
@EnableJms
public class MyQpidConfig {
    @Bean("myQpidConn")
    public AMQConnectionFactory amqConnectionFactory() throws URLSyntaxException {
        AMQConnectionFactory amqConnectionFactory = new AMQConnectionFactory("URL");
        return amqConnectionFactory;
    }

    @Bean("myJmsContainer")
    public DefaultMessageListenerContainer defaultMessageListenerContainer(@Qualifier("myQpidAdaptor") MessageListenerAdapter messageListenerAdapter,@Qualifier("myQpidConn")AMQConnectionFactory connectionFactory) {
        DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
        messageListenerContainer.setConnectionFactory(connectionFactory);
         messageListenerContainer.setDestinationName(destinationName);
        messageListenerContainer.setMessageListener(messageListenerAdapter);
        messageListenerContainer.setPubSubDomain(false);
        messageListenerContainer.setSessionTransacted(true);
        messageListenerContainer.setDurableSubscriptionName("MYSUBSCRIBER");
        messageListenerContainer.setSubscriptionDurable(true);
        messageListenerContainer.setAutoStartup(true);
        messageListenerContainer.setDestination((Queue) () -> "MYQUEUE");
        return messageListenerContainer;
    }

    @Bean("myQpidAdaptor")
    public MessageListenerAdapter messageListenerAdapter() {
        Preconditions.checkNotNull(qpidMessageListner,"Consumer listener not set");
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MyQpidMessageListner());
        messageListenerAdapter.setDefaultListenerMethod("onMessage");
        messageListenerAdapter.setDefaultResponseQueueName("MYQUEUE");
        return messageListenerAdapter;
    }
   
}

目的地名称等于

  ADDR:MYSUBSCRIBER:MYQUEUE; {"create":"receiver","node":{"durable":true,"x-declare":
    {"exclusive":true,"auto-delete":false,"x-bindings":[{"exchange":"amq.topic","key":"MYQUEUE"}]}}}

和我的Listener类如下,

@Slf4j
@Component
public class MyQpidMessageListner implements  MessageListener{
    @Override
    public void onMessage(Message jmsBytesMessage) {
       log.info("Control Inside");
       //to do
    }
}

当我当时正在运行我的应用程序时,作为异步消费者,获得了与目的地有关的异常,

DefaultMessageListenerContainer - 
Setup of JMS message listener invoker Failed for destination 'com.my.MyQpidConfig$$Lambda$637/429515253@3aa1975' - trying to recover. Cause: com.my.MyQpidConfig$$Lambda$637/429515253 cannot be cast to org.apache.qpid.client.AMQDestination  

如果它与AMQDestination相关,那么我如何在Java代码中创建AMQDestination。 我没有有关此QPID问题的任何资源。 任何建议的帮助都必须感谢。

解决方法

由于某种原因,您正在将lambda传递给org.springframework.jms.listener.DefaultMessageListenerContainer.setDestination(Destination),即:

messageListenerContainer.setDestination((Queue) () -> "MYQUEUE");

此方法需要javax.jms.Destination实现(例如org.apache.qpid.client.AMQDestination),而不是lambda。

您已经设置了目的地名称,即:

messageListenerContainer.setDestinationName(destinationName);

目前尚不清楚为什么还要尝试设置目的地。这是配置同一事物的两种方法。

我建议您仅删除试图设置目标的代码,而保留设置目标名称的代码。