仅当线程池中的线程可用于处理消息时,才将消息从JMS队列中取消排队

问题描述

要求是侦听JMS队列并以异步方式处理请求,并在其他JMS队列上发送响应。仅允许系统一次处理固定数量的请求。除非有可用的线程来处理请求,否则不得将其出队。

  1. 使用@JMSListener批注方法将请求从请求队列中出队。
  2. 增加全局计数器
  3. 侦听器通过调用@Async带注释的方法来执行请求。
  4. 如果计数器==线程池的最大大小,则停止监听。否则,收听者可以自由收听新请求
  5. Async方法处理请求并在Producer服务中调用sendResponse()方法以在响应队列上发送响应
  6. 生产者方法在响应队列上发送响应,并减少全局计数器。
  7. 如果将侦听器bean设置为停止侦听,那么producer方法将把该bean设置为再次开始侦听。

问题: 由于正在调用sendResponse()的线程是线程池线程之一,因此直到sendResponse()方法完成后,该线程才能释放。这意味着即使计数器已递减,线程仍然不可用。

听众:

@Service
public class ActiveMQListener{

    @Autowired
    ApplicationContext applicationContext;

    @Autowired
    AsyncService aSyncService;

    @Autowired
    ThreadPoolCounter threadPoolCounter;

    @JmsListener(id = "req1",destination = "RequestQueue1",containerFactory = "jmsFactory")
    public void receiveRequest(ActiveMQTextMessage message) throws JMSException {
        String requestMsg = messget.getText()
        
        //some validation

        aSyncService.executeRequest(requestMsg);

        if(threadPoolCounter.incrementCounter() == 10){
            JmsListenerEndpointRegistry customregistry = applicationContext.getBean(JmsListenerEndpointRegistry.class);
            MessageListenerContainer listenerContainer = customregistry.getListenerContainer("req1");
            listenerContainer.stop();
        }
    }
}

生产者:

@Service
public class ActiveMQProducer {

    @Autowired
    jmstemplate jmstemplate;

    @Autowired
    ThreadPoolCounter threadPoolCounter;

    @Autowired
    ApplicationContext applicationContext;


    public void sendResponse(ActiveMQTextMessage responseMsg) throws JMSException {
        jmstemplate.convertAndSend("res1",responseMsg);

        threadPoolCounter.decrement();

        JmsListenerEndpointRegistry customregistry = applicationContext.getBean(JmsListenerEndpointRegistry.class);
        MessageListenerContainer listenerContainer = customregistry.getListenerContainer("req1");

        if(!listenerContainer.isRunning()){
            listenerContainer.start();
        }
    }
}

异步方法

@Service
public class AsyncService{

    @Autowired
    ActiveMQProducer activeMQProducer;

    @Async("RequestExecutor")
    public void executeRequest(String requestMsg) {
        String resposeMsg = "";

        //Some Async Processing

        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText(responseMsg);
        activeMQProducer.sendResponse(message);
    }

}

我想到过的事情:
将侦听器的并发性提高为=线程池大小。将异步方法的返回类型更改为返回Future。这应该解决确保仅在有线程可处理请求时才将请求出队的问题。但是,在这种情况下,线程数增加了一倍。例如,如果线程池大小为10,那么我将有10个侦听器线程在等待future.get()。如果将来必须增加要处理的请求的数量,那么它会受到侦听器线程数量的影响。

@Service
public class ActiveMQListener{

    @Autowired
    AsyncService aSyncService;

    @Autowired
    ActiveMQProducer activeMQProducer;


    @JmsListener(id = "req1",containerFactory = "jmsFactory")
    public void receiveRequest(ActiveMQTextMessage message) throws JMSException {
        String requestMsg = messget.getText()
        
        //some validation

        Future<String> future = aSyncService.executeRequest(requestMsg);
        String responseMsg = future.get();

        activeMQProducer.sendResponse(responseMsg);
    }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)