ThreadPoolTask​​Executor在池上只有一个线程不处理来自AWS队列的消息

问题描述

我已经为应用程序中注册的每个队列创建了一个按需ChannelAdapter,AsyncTaskExecutor和一个Channel。我注意到,当AsyncTaskExecutor的maxPoolSize数量等于1时,将不处理消息。这就是AsyncTaskExecutor bean的创建方式。

 static void registerasyncTaskExecutor(final Consumer consumer,final GenericApplicationContext registry) {
        final TaskExecutor executor = consumer.getExecutor();

        final BeanDeFinitionBuilder builder = BeanDeFinitionBuilder.genericBeanDeFinition(ThreadPoolTaskExecutor.class);
        builder.addPropertyValue("corePoolSize",executor.getCorePoolSize());
        builder.addPropertyValue("maxPoolSize",executor.getMaxPoolSize());
        builder.addPropertyValue("threadNamePrefix",consumer.getName() + "-");

        final String beanName = executor.getName();
        final BeanDeFinition beanDeFinition = builder.getBeanDeFinition();
        registry.registerBeanDeFinition(beanName,beanDeFinition);
    }

我注意到的另一件事是,当此方法称为java.util.concurrent.ThreadPoolExecutor#execute时,此条件workerCountOf(c) < corePoolSize始终为false。 完整的项目链接在此处https://github.com/LeoFuso/spring-integration-aws-demo

解决方法

仅向一个可管理组件提供一个线程池总是一个坏习惯。您可能不知道该组件与线程池有什么关系,这实际上可能是一个事实,即您的单个线程由内部一些长期存在的任务占用,而所有新任务都将停在队列中等待该单个线程是空闲的,这可能不会发生。

事实上,Spring Cloud AWS的AsynchronousMessageListener确实具有上述提到的SqsMessageDrivenChannelAdapter使用的功能:

public void run() {
        while (isQueueRunning()) {

因此,或者依靠默认的执行程序,或者在您自己的线程中提供足够的线程。

看起来那里的逻辑就是这样的线程数:

    int spinningThreads = this.getRegisteredQueues().size();

    if (spinningThreads > 0) {
        threadPoolTaskExecutor
                .setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);

因此,当我们提供SQS队列时,我们具有确切的线程数,加上用于工作程序的2乘数。看来我们需要为每个队列轮询一个线程,并需要额外的线程来处理来自它们的消息。

(虽然不是Spring Integration问题-更像是Spring Cloud AWS)。