问题描述
我正在使用spring-integrtion-aws 通过autoStartup =“ false”从s3 Bucket流文件,
我需要使用控制总线机制来启动通道,但是这很不正确
org.springframework.messaging.MessageHandlingException:消息处理程序中发生错误[[org.springframework.integration.handler.ExpressionCommandMessageProcessor@6f50d92a]的ServiceActivator(controlBus)];嵌套异常是org.springframework.expression.EvaluationException:此命令处理器不支持方法stop。如果使用控制总线,请考虑添加@ManagedOperation或@ManagedAttribute。
@Bean(name = "s3ChannelAdapter")
@InboundChannelAdapter(value = "s3Channel",autoStartup = "false",poller = @Poller(fixedDelay = FIXED_DELAY,maxMessagesPerPoll = MESSAGE_PER_POLL))
public MessageSource<InputStream> streamReadingMessageSource() {
log.info("starting streamReadingMessageSource");
S3StreamingMessageSource s3StreamingMessageSource = new S3StreamingMessageSource(template());
s3StreamingMessageSource.setRemoteDirectory(swiftFilesBucketName + "/" + fileLocationPending);
s3StreamingMessageSource.setFilter(compositeFileListFilter());
return s3StreamingMessageSource;
}
private CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter() {
Pattern pattern = Pattern.compile(FIN_FILE_EXTENSION_REGEX,Pattern.CASE_INSENSITIVE);
CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter = new CompositeFileListFilter<>();
compositeFileListFilter
.addFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),"streaming"))
.addFilter(new S3RegexPatternFileListFilter(pattern));
return compositeFileListFilter;
}
private S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
}
@Bean
@Qualifier("s3Channel")
public PollableChannel s3Channel() {
return new QueueChannel();
}
@Bean
public MessageChannel leaderFilterChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel processRequest() {
return new DirectChannel();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
@Bean
public DirectChannel operationChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusfactorybean controlBus() {
return new ExpressionControlBusfactorybean();
}
public class S3FileProcessingPollerController {
@Autowired
MessageChannel operationChannel;
@ManagedOperation
public void startPoller() {
log.info("method=startPoller invoked starting poller ");
try {
Message<String> startPollerMessage = MessageBuilder.withPayload("@s3ChannelAdapter.start()").build();
operationChannel.send(startPollerMessage);
} catch (Exception e) {
log.warn("Unable to perform start of file processor",e);
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)