问题描述
我需要轮询FTP服务器并处理新的或更改的文件。我将Spring Integration 5.3.2与入站FTP适配器和轮询器配合使用,固定速率为5秒。所有文件都立即下载到本地目录中,但集成处理程序的基础流程每5秒就会对每个文件调用一次。我想在并发线程中立即处理下载的文件列表,但是在流程结束后每5秒轮询一次ftp。我该怎么办?
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.maxFetchSize(ftpProperties.maxFetchSize)
.remoteDirectory(ftpProperties.remoteDirectory)
.localDirectory(File(ftpProperties.downloadDirectory))
.filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(),"ftp-"))
.regexFilter(".*\\.zip$")
) { e -> e.poller(Pollers.fixedrate(Duration.ofSeconds(5))) }
.channel(MessageChannels.executor(Executors.newWorkStealingPool()))
.transform(unZipTransformer())
.handle { m -> LOGGER.info("Unzipped {}",m.headers[FileHeaders.FILENAME]) }
.get()
}
解决方法
设置maxMessagesPerPoll
-默认为1; -1表示无穷大。
Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)
,
听起来更像是您想使用MGET
中的FtpOutboundGateway
命令。因此,您仍然可以使用带有空字符串有效载荷的普通轮询器来发送消息,例如带有所需轮询器选项的 IntegrationFlows.from(() -> "")
。
使用Ftp.outboundGateway()
命令对远程目录调用MGET
会给您List<File>
作为回复消息。因此,您可以随意对文件进行批处理。
有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway