Spring Integration Channel卡住并且不处理消息

问题描述

最近,我们向弹簧集成轮询器引入了任务执行器,以加快文件读取过程。但是引入任务执行程序会导致意外问题,因为我们的服务停止了在Spring集成渠道中处理消息

  <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
        http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/file
        http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/integration/jdbc 
        http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
        http://www.springframework.org/schema/integration/amqp 
        http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd">


    
    <integration:channel id="filesIn" />
    <integration:channel id="toArchive" />
    
    
    <integration:channel id="toRabbitForRO" />
    <integration:channel id="outputFilesROIn" />
    
    <integration:channel id="toRabbitForSA" />
    <integration:channel id="outputFilesSAIn" />
    
    <int-file:inbound-channel-adapter directory="${rnr.file.directory}" auto-startup="true"
                            filter="filterFiles" channel="filesIn">
                <integration:poller 
                    cron="0 0,5,10,15,20,25,30,35,40,45,50,55 0-9,18-23 * * ?"
                    task-executor="largeFileTaskExecutor" 
                    max-messages-per-poll="${max-messages}"/>
    </int-file:inbound-channel-adapter>
    
    
    <integration:service-activator input-channel="filesIn" output-channel="toArchive"
                     ref="processSingleLargeFile" method="process"></integration:service-activator>
    
    
    <int-file:outbound-channel-adapter channel="toArchive" delete-source-files="true"
                                         directory="file:${rnr.file.directory}/archive">
    </int-file:outbound-channel-adapter>
    
    
     <int-file:inbound-channel-adapter directory="${roOutputDir}"
                                    auto-startup="true" filename-pattern="*.xml" channel="outputFilesROIn">
                    <integration:poller fixed-delay="200"  
                       task-executor="smallFileTaskExecutor" 
                       max-messages-per-poll="${max-messages}" ></integration:poller>
     </int-file:inbound-channel-adapter>
    
     
     <integration:service-activator input-channel="outputFilesROIn" 
                                output-channel="toRabbitForRO" ref="processMultipleFiles" method="processROFile"></integration:service-activator>



    <int-amqp:outbound-channel-adapter
        channel="toRabbitForRO" amqp-template="rabbitTemplate" exchange-name="sample-excahnge"
        routing-key="sample-key" />
        
        

</beans>

在poller中引入的第一个任务执行器可以完美运行。但是第二个轮询器似乎不起作用。它不会从提到的目录中读取文件


     <int-file:inbound-channel-adapter directory="${roOutputDir}"
                                    auto-startup="true" filename-pattern="*.xml" channel="outputFilesROIn">
                    <integration:poller fixed-delay="200"  
                       task-executor="smallFileTaskExecutor" 
                       max-messages-per-poll="${max-messages}" ></integration:poller>
     </int-file:inbound-channel-adapter>
// this file channel adapter is not working . No message appear in output channel **outputFilesROIn**

smallFileTaskExecutor和largeFileTaskexecutor具有2个线程作为核心池大小。 每个轮询者的每次轮询的最大邮件数定义为2

我们的服务未处理消息时的线程转储:https://fastthread.io/my-thread-report.jsp?p=c2hhcmVkLzIwMjAvMDgvMTkvLS1hcGktZWQzZTJmYzMtMWFkYy00Mzk5LWJkZjgtNzk0NGQwMzdjNjIwMjg2Njk5ZDMtYTFmNC00YzIzLThmZTQtYzQ4Nzg4NmNhNGM1LnR4dC0t&

PS:在读取文件中实现并发时遵循了How to read and process multiple files concurrently in spring?

解决方法

根据您的线程转储,看起来带有10个线程池的默认任务调度程序似乎很忙。您的cron可能非常激进,并且有了largeFileTaskExecutor,它就有机会触发疯狂的预定任务。这样,您的第二个轮询者就没有任务计划程序中的资源来完成其工作。

考虑调整您的cron或将该任务计划程序重新配置为更大的线程池。或者只是不要使用执行程序,因为它向我们确认不会提高您的性能。

有关调度程序线程池的信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/configuration.html#namespace-taskscheduler