Spring Integration - 复制文件后停止入站适配器

问题描述

我正在使用 Spring 集成进行本地文件复制。我的课程稍后将在本文中介绍。

我通过从另一个类发出 adapter.start() 来启动文件复制。这工作正常,并且适配器 (localFileTransferAdapter) 运行一次(基于 FireOnceTrigger)复制我期望的所有文件。但是我想在复制所有文件后停止适配器。

如何检测适配器已复制文件,以便我可以停止适配器? FireOnceTrigger 将永远不会再次触发 - 但当我从用于启动适配器的类中查询它时,适配器仍然显示正在运行。我可以等待几秒钟并停止适配器 - 但如果有许多大文件要复制,它可能会在复制文件的过程中停止适配器 - 我不希望发生这种情况。

我已查看 How to stop polling after a message is received? Spring Integration,但它似乎与我的用例不符。

预先感谢您的任何帮助。

public class LocalFileTransfer {

    @Value("${source.directory:c:/source}")
    private String sourceDirectory;

    @Value("${target.directory:c:/target}")
    private String targetDirectory;

    @Bean
    public MessageSource<File> sourceDirectory() {
        FileReadingMessageSource messageSource = new FileReadingMessageSource();
        messageSource.setDirectory(new File(sourceDirectory));
        return messageSource;
    }

    @Bean
    public IntegrationFlow fileMover() {
        return IntegrationFlows.from(sourceDirectory(),c -> c.autoStartup(false)
                        .id("localFileTransferAdapter")
                        .poller(Pollers.trigger(new FireOnceTrigger())
                .maxMessagesPerPoll(-1)))
                .filter(source -> ((File) source).getName().endsWith(".txt"))
                .log(LoggingHandler.Level.ERROR,"localfile.category",m -> m.getPayload())
                .log(LoggingHandler.Level.ERROR,m -> m.getHeaders())
                .handle(targetDirectory())
                .get();
    }
    @Bean
    public MessageHandler targetDirectory() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(targetDirectory));
        handler.setFileExistsMode(FileExistsMode.REPLACE);
        handler.setExpectReply(false);
        return handler;
   }
}

解决方法

您在使用 FireOnceTriggermaxMessagesPerPoll(-1) 时走在正确的轨道上。此外,AbstractMessageSourceAdvice 示例的答案完全适合您的情况。

MessageSourceMutator.afterReceive() 中,您只需检查 null 参数的 result 并调用该 stop()localFileTransferAdapter

由于所有事情都发生在同一个线程上,因此您可以安全地在遇到 null 时调用 stop:无需引入一些延迟,并且通道适配器在该线程中完成生成消息之前不存在轮询周期。