问题描述
要求是侦听JMS队列并以异步方式处理请求,并在其他JMS队列上发送响应。仅允许系统一次处理固定数量的请求。除非有可用的线程来处理请求,否则不得将其出队。
- 使用@JMSListener批注方法将请求从请求队列中出队。
- 增加全局计数器
- 侦听器通过调用@Async带注释的方法来执行请求。
- 如果计数器==线程池的最大大小,则停止监听。否则,收听者可以自由收听新请求
- Async方法处理请求并在Producer服务中调用sendResponse()方法以在响应队列上发送响应
- 生产者方法在响应队列上发送响应,并减少全局计数器。
- 如果将侦听器bean设置为停止侦听,那么producer方法将把该bean设置为再次开始侦听。
问题: 由于正在调用sendResponse()的线程是线程池线程之一,因此直到sendResponse()方法完成后,该线程才能释放。这意味着即使计数器已递减,线程仍然不可用。
听众:
@Service
public class ActiveMQListener{
@Autowired
ApplicationContext applicationContext;
@Autowired
AsyncService aSyncService;
@Autowired
ThreadPoolCounter threadPoolCounter;
@JmsListener(id = "req1",destination = "RequestQueue1",containerFactory = "jmsFactory")
public void receiveRequest(ActiveMQTextMessage message) throws JMSException {
String requestMsg = messget.getText()
//some validation
aSyncService.executeRequest(requestMsg);
if(threadPoolCounter.incrementCounter() == 10){
JmsListenerEndpointRegistry customregistry = applicationContext.getBean(JmsListenerEndpointRegistry.class);
MessageListenerContainer listenerContainer = customregistry.getListenerContainer("req1");
listenerContainer.stop();
}
}
}
生产者:
@Service
public class ActiveMQProducer {
@Autowired
jmstemplate jmstemplate;
@Autowired
ThreadPoolCounter threadPoolCounter;
@Autowired
ApplicationContext applicationContext;
public void sendResponse(ActiveMQTextMessage responseMsg) throws JMSException {
jmstemplate.convertAndSend("res1",responseMsg);
threadPoolCounter.decrement();
JmsListenerEndpointRegistry customregistry = applicationContext.getBean(JmsListenerEndpointRegistry.class);
MessageListenerContainer listenerContainer = customregistry.getListenerContainer("req1");
if(!listenerContainer.isRunning()){
listenerContainer.start();
}
}
}
异步方法:
@Service
public class AsyncService{
@Autowired
ActiveMQProducer activeMQProducer;
@Async("RequestExecutor")
public void executeRequest(String requestMsg) {
String resposeMsg = "";
//Some Async Processing
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(responseMsg);
activeMQProducer.sendResponse(message);
}
}
我想到过的事情:
将侦听器的并发性提高为=线程池大小。将异步方法的返回类型更改为返回Future。这应该解决确保仅在有线程可处理请求时才将请求出队的问题。但是,在这种情况下,线程数增加了一倍。例如,如果线程池大小为10,那么我将有10个侦听器线程在等待future.get()。如果将来必须增加要处理的请求的数量,那么它会受到侦听器线程数量的影响。
@Service
public class ActiveMQListener{
@Autowired
AsyncService aSyncService;
@Autowired
ActiveMQProducer activeMQProducer;
@JmsListener(id = "req1",containerFactory = "jmsFactory")
public void receiveRequest(ActiveMQTextMessage message) throws JMSException {
String requestMsg = messget.getText()
//some validation
Future<String> future = aSyncService.executeRequest(requestMsg);
String responseMsg = future.get();
activeMQProducer.sendResponse(responseMsg);
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)