ActiveMQ Artemis和5.x侦听器同时-NullPointerException

问题描述

我有一个旧版Spring 4.2.1.RELEASE应用程序,该应用程序以侦听器连接到ActiveMQ 5.x,现在我们要添加到ActiveMQ Artemis的连接。对于Artemis,我们使用持久订阅,是因为我们不希望订阅者崩溃时共享主题上的消息丢失,并且我们不希望共享订阅,因为我们希望选择集群或使用并发来异步处理订阅中的消息。我有单独的ConnectionFactoryListenerContainer,但是不断重复的WARN日志似乎由于以下NPE而无法启动Artemis DMLC:

java.lang.NullPointerException
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
    at java.lang.Thread.run(Unknown Source)

表面上似乎找不到方法createSharedDurableConsumer。看着我拥有的AbstractMessageListenerContainer,第856行正在调用method.invoke

/** The JMS 2.0 Session.createSharedDurableConsumer method,if available */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
        Session.class,"createSharedDurableConsumer",Topic.class,String.class,String.class);

...

Method method = (isSubscriptionDurable() ?
                        createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
    return (MessageConsumer) method.invoke(session,destination,getSubscriptionName(),getMessageSelector());
}

Artemis配置:

@Configuration
public class ArtemisConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory artemisConnectionFactory() {
        ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
                .createConnectionFactoryWithHA(JMSFactoryType.CF,createTransportConfigurations());

        artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
        artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
        artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis",Long.class));
        artemisConnectionFactory.setConnectionTTL(env.getRequiredProperty("artemis.connection.ttl.millis",Long.class));
        artemisConnectionFactory
                .setCallFailoverTimeout(env.getRequiredProperty("artemis.call.failover.timeout.millis",Long.class));
        artemisConnectionFactory.setInitialConnectAttempts(
                env.getRequiredProperty("artemis.connection.attempts.initial",Integer.class));
        artemisConnectionFactory
                .setReconnectAttempts(env.getRequiredProperty("artemis.connection.attempts.reconnect",Integer.class));
        artemisConnectionFactory.setRetryInterval(env.getRequiredProperty("artemis.retry.interval.millis",Long.class));
        artemisConnectionFactory
                .setRetryIntervalMultiplier(env.getRequiredProperty("artemis.retry.interval.multiplier",Double.class));
        artemisConnectionFactory.setBlockOnAcknowledge(true);
        artemisConnectionFactory.setBlockOnDurableSend(true);
        artemisConnectionFactory.setCacheDestinations(true);
        artemisConnectionFactory.setConsumerWindowSize(0);
        artemisConnectionFactory.setMinLargeMessageSize(1024 * 1024);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);

        cachingConnectionFactory
        .setSessionCacheSize(env.getRequiredProperty("artemis.session.cache.size",Integer.class));
        cachingConnectionFactory.setReconnectOnException(true);

        return cachingConnectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,JmsTransactionManager artemisJmsTransactionManager,MappingJackson2MessageConverter mappingJackson2MessageConverter) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        factory.setConnectionFactory(artemisConnectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setMessageConverter(mappingJackson2MessageConverter);
        factory.setSubscriptionDurable(Boolean.TRUE);
        factory.setSubscriptionShared(Boolean.TRUE);
        factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        factory.setSessionTransacted(Boolean.TRUE);
        factory.setTransactionManager(artemisJmsTransactionManager);

        return factory;
    }

    private TransportConfiguration[] createTransportConfigurations() {
        String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
        Map<String,Object> primaryTransportParameters = new HashMap<>(2,1F);
        String primaryHostname = env.getRequiredProperty("artemis.primary.hostname");
        Integer primaryPort = env.getRequiredProperty("artemis.primary.port",Integer.class);

        primaryTransportParameters.put("host",primaryHostname);
        primaryTransportParameters.put("port",primaryPort);

        return new TransportConfiguration[] {
                new TransportConfiguration(connectorFactoryFqcn,primaryTransportParameters),new TransportConfiguration(connectorFactoryFqcn,backupTransportParameters) };
    }
}

我的pom使用Artemis的2.10.0版本。

我该如何解决?

解决方法

JMS 2.0规范向后兼容JMS 1.1,因此请确保您在类路径中仅具有JMS 2规范。我的直觉是,Spring代码中的反射调用变得混乱,因为它们达到了JMS 1.1规范类,而不是正确的JMS 2规范类。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...