问题描述
我正在使用 Spring 集成进行本地文件复制。我的课程稍后将在本文中介绍。
我通过从另一个类发出 adapter.start() 来启动文件复制。这工作正常,并且适配器 (localFileTransferAdapter) 运行一次(基于 FireOnceTrigger)复制我期望的所有文件。但是我想在复制所有文件后停止适配器。
如何检测适配器已复制文件,以便我可以停止适配器? FireOnceTrigger 将永远不会再次触发 - 但当我从用于启动适配器的类中查询它时,适配器仍然显示正在运行。我可以等待几秒钟并停止适配器 - 但如果有许多大文件要复制,它可能会在复制文件的过程中停止适配器 - 我不希望发生这种情况。
我已查看 How to stop polling after a message is received? Spring Integration,但它似乎与我的用例不符。
预先感谢您的任何帮助。
public class LocalFileTransfer {
@Value("${source.directory:c:/source}")
private String sourceDirectory;
@Value("${target.directory:c:/target}")
private String targetDirectory;
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(sourceDirectory));
return messageSource;
}
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlows.from(sourceDirectory(),c -> c.autoStartup(false)
.id("localFileTransferAdapter")
.poller(Pollers.trigger(new FireOnceTrigger())
.maxMessagesPerPoll(-1)))
.filter(source -> ((File) source).getName().endsWith(".txt"))
.log(LoggingHandler.Level.ERROR,"localfile.category",m -> m.getPayload())
.log(LoggingHandler.Level.ERROR,m -> m.getHeaders())
.handle(targetDirectory())
.get();
}
@Bean
public MessageHandler targetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(targetDirectory));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
}
解决方法
您在使用 FireOnceTrigger
和 maxMessagesPerPoll(-1)
时走在正确的轨道上。此外,AbstractMessageSourceAdvice
示例的答案完全适合您的情况。
在 MessageSourceMutator.afterReceive()
中,您只需检查 null
参数的 result
并调用该 stop()
的 localFileTransferAdapter
。
由于所有事情都发生在同一个线程上,因此您可以安全地在遇到 null
时调用 stop:无需引入一些延迟,并且通道适配器在该线程中完成生成消息之前不存在轮询周期。