使用不同的消息选择器创建 DefaultMessageListenerContainer 运行时

问题描述

下面是我尝试在部署应用程序后创建动态 Listenercontainer 实例的代码。如果没有消息选择器(注释代码),我可以看到我的消息正在被消费。一旦我添加了 setMessageSelector,消息就不会被消耗。 我将生产者配置为使用两个不同的消息选择器生成消息,比如 color='RED' 和另一个 color='BLUE'。我已经使用 Spring XML 配置连接了“RED”。并且此配置可以正常工作。我能够看到消费者正在消费的消息。但是当我尝试创建一个带有 color='BLUE' 的动态 bean 时,它不起作用。如果我将它添加到 spring XML 中,同样可以正常工作

'''

DefaultMessageListenerContainer defaultMessageListenerContainer=new DefaultMessageListenerContainer();
       defaultMessageListenerContainer.setAutoStartup(Boolean.FALSE);
       defaultMessageListenerContainer.setMessageListener(this.getMessageListener());
       //defaultMessageListenerContainer.setMessageSelector(this.getMessageSelector());
       defaultMessageListenerContainer.setBeanName(this.getBeanName());
       defaultMessageListenerContainer.setConnectionFactory(this.getConnectionFactory());
       defaultMessageListenerContainer.setDestination((Destination) this.getApplicationContext().getBean("customDestination"));
       defaultMessageListenerContainer.setSessionTransacted(Boolean.TRUE);
       defaultMessageListenerContainer.setConcurrentConsumers(1);
       defaultMessageListenerContainer.setMaxConcurrentConsumers(5);
       defaultMessageListenerContainer.initialize();
       defaultMessageListenerContainer.afterPropertiesSet();
       defaultMessageListenerContainer.start();
       System.out.println(defaultMessageListenerContainer.isRunning());
       System.out.println(defaultMessageListenerContainer.isAcceptMessagesWhileStopping());
       System.out.println(defaultMessageListenerContainer.isRegisteredWithDestination());
       ConfigurableListablebeanfactory beanfactory = ((ConfigurableApplicationContext) applicationContext).getbeanfactory();
       beanfactory.registerSingleton("jmsRequestListenerContainer",defaultMessageListenerContainer);

<bean id="jmsRequestListenerContainerdefault" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="concurrentConsumers" value="1" />
    <property name="maxConcurrentConsumers" value="5" />
    <property name="cacheLevel"      value="0"/>
    <property name="connectionFactory"   ref="queueConnectionFactory" />
    <property name="destination"          ref="customeDestination"/>
    <property name="sessionTransacted"  value="true"/>
    <property name="messageListener" ref="jmsRequestListener" />
    <property name="messageSelector" value="color='RED'"/>
</bean>


<jee:jndi-lookup id="queueConnectionFactory" jndi-name="java:/JmsXA"/>

'''

在应用程序启动后动态创建代码时,我是否遗漏了代码中的某些内容

解决方法

这对我来说很好用...

@SpringBootApplication
public class So66359276Application {

    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext ctx = SpringApplication.run(So66359276Application.class,args);
        Thread.sleep(5_000);
        ctx.close();
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template,ConnectionFactory cf,GenericApplicationContext ctx) {
        template.setDefaultDestinationName("foo");
        return args -> {
            createContainer("RED",new JmsRequestListener1(),cf,ctx);
            createContainer("BLUE",new JmsRequestListener2(),ctx);
            IntStream.range(0,10).forEach(i -> {
                template.convertAndSend("foo" + i,msg -> {
                    msg.setStringProperty("color","RED");
                    return msg;
                });
            });
            IntStream.range(0,10).forEach(i -> {
                template.convertAndSend("bar" + i,"BLUE");
                    return msg;
                });
            });
        };
    }

    private void createContainer(String color,MessageListener listener,GenericApplicationContext ctx) {

        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(cf);
        container.setSessionTransacted(true);
        container.setDestinationName("foo");
        container.setMessageSelector("color='" + color + "'");
        container.setMessageListener(listener);
        ctx.registerBean("container" + color,DefaultMessageListenerContainer.class,() -> container);
        ctx.getBean("container" + color,DefaultMessageListenerContainer.class).start();
    }

}

class JmsRequestListener1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("1:" + ((TextMessage) message).getText() + " - "
                    + message.getStringProperty("color"));
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

class JmsRequestListener2 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("2:" + ((TextMessage) message).getText() + " - "
                    + message.getStringProperty("color"));
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
    }

}
1:foo0 - RED
1:foo1 - RED
1:foo2 - RED
1:foo3 - RED
1:foo4 - RED
1:foo5 - RED
1:foo6 - RED
1:foo7 - RED
1:foo8 - RED
1:foo9 - RED
2:bar0 - BLUE
2:bar1 - BLUE
2:bar2 - BLUE
2:bar3 - BLUE
2:bar4 - BLUE
2:bar5 - BLUE
2:bar6 - BLUE
2:bar7 - BLUE
2:bar8 - BLUE
2:bar9 - BLUE

registerBean 是一种在运行时添加 bean 定义的新方法,但使用 registerSingleTon 也应该有效。