Spring集成ftp适配器和批处理

问题描述

我需要轮询FTP服务器并处理新的或更改的文件。我将Spring Integration 5.3.2与入站FTP适配器和轮询器配合使用,固定速率为5秒。所有文件都立即下载到本地目录中,但集成处理程序的基础流程每5秒就会对每个文件调用一次。我想在并发线程中立即处理下载的文件列表,但是在流程结束后每5秒轮询一次ftp。我该怎么办?

@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
            .preserveTimestamp(true)
            .maxFetchSize(ftpProperties.maxFetchSize)
            .remoteDirectory(ftpProperties.remoteDirectory)
            .localDirectory(File(ftpProperties.downloadDirectory))
            .filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(),"ftp-"))
            .regexFilter(".*\\.zip$")
    ) { e -> e.poller(Pollers.fixedrate(Duration.ofSeconds(5))) }
            .channel(MessageChannels.executor(Executors.newWorkStealingPool()))
            .transform(unZipTransformer())
            .handle { m -> LOGGER.info("Unzipped {}",m.headers[FileHeaders.FILENAME]) }
            .get()
}

解决方法

设置maxMessagesPerPoll-默认为1; -1表示无穷大。

Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)
,

听起来更像是您想使用MGET中的FtpOutboundGateway命令。因此,您仍然可以使用带有空字符串有效载荷的普通轮询器来发送消息,例如带有所需轮询器选项的 IntegrationFlows.from(() -> "")

使用Ftp.outboundGateway()命令对远程目录调用MGET会给您List<File>作为回复消息。因此,您可以随意对文件进行批处理。

有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway