Spring Sftp在入站适配器消息处理程序中使用出站网关获取文件

问题描述

我正在使用Java DSL使用入站适配器来轮询SFTP服务器上的pdf文件我有一个用例,在获取pdf文件后,应用程序将获取CSV格式的配置文件,该配置文件在SFTP服务器上具有相同的名称提取配置文件后,应用程序将使用配置文件中定义的属性处理原始pdf文件,并使用出站适配器将其上传回SFTP服务器。

我在使用出站网关在同一线程上的处理程序中获取配置文件时遇到问题。

这是我的代码

注册集成流程:

  for (String client : clientsArr) {
      this.flowContext.registration(getInboundIntegrationFlow(client)).register();
  }

  this.flowContext.registration(getoutboundIntegrationFlow()).register();
  this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();

入站适配器集成流程:


  @Autowired
  private SftpPdfMessageHandler messageHandler;

  private IntegrationFlow getInboundIntegrationFlow(String client) {

    String remoteDirectory = getRemoteDirectory(client);
    String localDirectory = getLocalDirectory(client);
    String inboundAdapterId = getInboundAdapterId(client);

    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(remoteDirectory)
                .autocreateLocalDirectory(true)
                .localDirectory(new File(localDirectory))
                .maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
                .filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
                .deleteRemoteFiles(true),e -> e.id(inboundAdapterId)
                .autoStartup(true)
                .poller(Pollers
                    .fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()),TimeUnit.SECONDS)
                    .receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
                    .maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
                ))
        .handle(inBoundHandler())
        .get();
  }

  public MessageHandler inBoundHandler() {
    return message -> {
      File file = (File) message.getPayload();
      messageHandler.handleMessage(file);
    };
  }

出站适配器集成流程:

  private IntegrationFlow getoutboundIntegrationFlow() {

    return IntegrationFlows.from("sftpOutboundChannel")
        .handle(Sftp.outboundAdapter(sftpSessionFactory(),FileExistsMode.FAIL)
            .remoteDirectoryExpression(String.format("headers['%s']",FileHeaders.REMOTE_DIRECTORY))).get();
  }

  @Bean("sftpOutboundChannel")
  public MessageChannel sftpOutboundChannel() {
    return new DirectChannel();
  }

SFTP消息处理程序:

  @Async("sftpHandlerAsyncExecutor")
  public void handleMessage(File originalFile) {

    File configFile = fetchConfigFile();

    /*
      process original file and store processed file in output file path on local directory
     */
      
    boolean success = uploadFiletoSftpServer(outputFilePath,client,entity);

    if (success) {
      deleteFileFromLocal(originalFile);
    }
  }

出站网关GET集成流程:

  private IntegrationFlow sftpGatewayGetIntegrationFlow() {
    return IntegrationFlows.from("sftpGetInputChannel")
        .handle(Sftp.outboundGateway(sftpSessionFactory(),AbstractRemoteFileOutboundGateway.Command.GET,"payload")
            .options(AbstractRemoteFileOutboundGateway.Option.DELETE,AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
            .localDirectoryExpression(String.format("headers['%s']",Constants.HEADER_LOCAL_DIRECTORY_NAME))
            .autocreateLocalDirectory(true))
        .channel("nullChannel")
        .get();
  }

  @Bean("sftpGetInputChannel")
  public MessageChannel sftpGetInputChannel() {
    return new DirectChannel();
  }

messageHandler.handleMessage()方法在异步中(使用ThreadPoolTask​​Executor)调用,该方法在内部使用出站网关获取配置文件。但是我找不到可以在同一线程中发送和接收消息有效负载的单个通道。我在春季文档中发现了MessagingTemplate,但是找不到将其与我的出站网关集成流程相连接的方法

sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel,new Genericmessage<>("/dir/file.csv",headers))在DirectChannel()中给出“ 调度程序没有频道订阅”的情况。

我正在寻找一种解决方案,可以通过以下任意一种方式从服务器获取所需文件

  • 使用适当的渠道将MessagingTemplate与IntegrationFlow集成(如果可能)。
  • 入站适配器流中的消息处理程序的一些链接,其中在轮询原始文件之后,它将使用sftp出站网关获取一个文件,然后使用两个对象(原始文件配置文件调用最终处理程序。我正在尝试使用上面的自定义代码来实现类似的目的。
  • 在多线程环境中为GET命令使用发送和轮询通道的任何其他方式。

应用程序需要在使用GET命令时在运行时确定目录路径

解决方法

您可能需要学习什么是@MessagingGateway,以及如何使其与出站网关上的渠道进行交互。

有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway

如果您确实希望获得配置文件,则不要执行.channel("nullChannel")。有了网关,将有replyChannel头和网关填充的TemporaryReplyChannel实例。然后在代码中,您将使用该功能接口作为API进行调用。

实际上,消息传递网关使用提到的MessagingTemplate.sendAndReceive()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...