问题描述
我已经为应用程序中注册的每个队列创建了一个按需ChannelAdapter,AsyncTaskExecutor和一个Channel。我注意到,当AsyncTaskExecutor的maxPoolSize
的数量等于1时,将不处理消息。这就是AsyncTaskExecutor bean的创建方式。
static void registerasyncTaskExecutor(final Consumer consumer,final GenericApplicationContext registry) {
final TaskExecutor executor = consumer.getExecutor();
final BeanDeFinitionBuilder builder = BeanDeFinitionBuilder.genericBeanDeFinition(ThreadPoolTaskExecutor.class);
builder.addPropertyValue("corePoolSize",executor.getCorePoolSize());
builder.addPropertyValue("maxPoolSize",executor.getMaxPoolSize());
builder.addPropertyValue("threadNamePrefix",consumer.getName() + "-");
final String beanName = executor.getName();
final BeanDeFinition beanDeFinition = builder.getBeanDeFinition();
registry.registerBeanDeFinition(beanName,beanDeFinition);
}
我注意到的另一件事是,当此方法称为java.util.concurrent.ThreadPoolExecutor#execute
时,此条件workerCountOf(c) < corePoolSize
始终为false。
完整的项目链接在此处https://github.com/LeoFuso/spring-integration-aws-demo
解决方法
仅向一个可管理组件提供一个线程池总是一个坏习惯。您可能不知道该组件与线程池有什么关系,这实际上可能是一个事实,即您的单个线程由内部一些长期存在的任务占用,而所有新任务都将停在队列中等待该单个线程是空闲的,这可能不会发生。
事实上,Spring Cloud AWS的AsynchronousMessageListener
确实具有上述提到的SqsMessageDrivenChannelAdapter
使用的功能:
public void run() {
while (isQueueRunning()) {
因此,或者依靠默认的执行程序,或者在您自己的线程中提供足够的线程。
看起来那里的逻辑就是这样的线程数:
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor
.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
因此,当我们提供SQS队列时,我们具有确切的线程数,加上用于工作程序的2
乘数。看来我们需要为每个队列轮询一个线程,并需要额外的线程来处理来自它们的消息。
(虽然不是Spring Integration问题-更像是Spring Cloud AWS)。