Spring Integration AWS文件从s3流式传输,如何启动@InboundChannelAdapter?

问题描述

我正在使用spring-integrtion-aws

enter image description here

通过autoStartup =“ false”从s3 Bucket流文件

我需要使用控制总线机制来启动通道,但是这很不正确

org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误[[org.springframework.integration.handler.ExpressionCommandMessageProcessor@6f50d92a]的ServiceActivator(controlBus)];嵌套异常是org.springframework.expression.EvaluationException:此命令处理器不支持方法stop。如果使用控制总线,请考虑添加@ManagedOperation或@ManagedAttribute。

 @Bean(name = "s3ChannelAdapter")
   @InboundChannelAdapter(value = "s3Channel",autoStartup = "false",poller = @Poller(fixedDelay = FIXED_DELAY,maxMessagesPerPoll = MESSAGE_PER_POLL))
   public MessageSource<InputStream> streamReadingMessageSource() {
      log.info("starting streamReadingMessageSource");
      S3StreamingMessageSource s3StreamingMessageSource = new S3StreamingMessageSource(template());
      s3StreamingMessageSource.setRemoteDirectory(swiftFilesBucketName + "/" + fileLocationPending);
      s3StreamingMessageSource.setFilter(compositeFileListFilter());
      return s3StreamingMessageSource;
   }

   private CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter() {
      Pattern pattern = Pattern.compile(FIN_FILE_EXTENSION_REGEX,Pattern.CASE_INSENSITIVE);
      CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter = new CompositeFileListFilter<>();
      compositeFileListFilter
         .addFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),"streaming"))
         .addFilter(new S3RegexPatternFileListFilter(pattern));
      return compositeFileListFilter;
   }

   private S3RemoteFileTemplate template() {
      return new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
   }

   @Bean
   @Qualifier("s3Channel")
   public PollableChannel s3Channel() {
      return new QueueChannel();
   }

   @Bean
   public MessageChannel leaderFilterChannel() {
      return new DirectChannel();
   }

   @Bean
   public MessageChannel processRequest() {
      return new DirectChannel();
   }

   @Bean(name = PollerMetadata.DEFAULT_POLLER)
   public PollerMetadata defaultPoller() {
      PollerMetadata pollerMetadata = new PollerMetadata();
      pollerMetadata.setTrigger(new PeriodicTrigger(10));
      return pollerMetadata;
   }

   @Bean
   public DirectChannel operationChannel() {
      return new DirectChannel();
   }

   @Bean
   @ServiceActivator(inputChannel = "operationChannel")
   public ExpressionControlBusfactorybean controlBus() {
      return new ExpressionControlBusfactorybean();
   }



我公开了一个mbean来调用通道适配器的启动

public class S3FileProcessingPollerController {

   @Autowired
   MessageChannel operationChannel;


   @ManagedOperation
   public void startPoller() {
      log.info("method=startPoller invoked starting poller ");
      try {

         Message<String> startPollerMessage = MessageBuilder.withPayload("@s3ChannelAdapter.start()").build();
         operationChannel.send(startPollerMessage);

      } catch (Exception e) {
         log.warn("Unable to perform start of file processor",e);
      }


   }

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)