问题描述
这是我的流 LS-GET(SFTP 出站网关:从远程 SFTP 服务器下载文件。) 和 MessagingGateway。
@MessagingGateway
public interface IntegratedRemoteFileProcessMessagingGateway {
@Gateway(requestChannel = "getFlows.input")
void getFlows(final String remoteDirectory);
@Gateway(requestChannel = "moveFlows.input")
void moveFlows(final String remoteDirectory);
}
@Bean
public QueueChannelSpec getoutputChannel() {
return MessageChannels.queue();
}
@Bean
public IntegrationFlow getFlows() {
return f -> f
.enrichHeaders(h -> h
.headerExpression("originalPayload","payload.toString()")
.headerExpression(FileHeaders.REMOTE_DIRECTORY,"payload.toString()"))
.log(LoggingHandler.Level.INFO,"eu.haee","'Header originalPayload=' + headers[originalPayload]")
.handle(Sftp.outboundGateway(sessionFactory,Command.LS.getCommand(),"payload")
.autocreateDirectory(false)
.autocreateLocalDirectory(false)
.charset("UTF-8")
.filter(new SftpSimplePatternFileListFilter("*.xml"))
.options(Option.NAME_ONLY,Option.RECURSIVE))
.split()
.log(LoggingHandler.Level.INFO,"'LS Payload= ' + payload.toString()")
.enrichHeaders(h -> h
.headerExpression("originalRemoteFile","payload.toString()")
.headerExpression(FileHeaders.REMOTE_FILE,"payload.toString()"))
.handle(Sftp.outboundGateway(sessionFactory,Command.GET.getCommand(),"headers['originalPayload'] + headers['file_remoteFile']")
.autocreateLocalDirectory(false)
.charset("UTF-8")
.fileNameExpression("headers['file_remoteFile']")
.localDirectory(new File(flowsConfiguration.localDirectory()))
.localFilenameExpression(new FunctionExpression<Message<?>>(m -> {
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(m);
final String remoteFileName = (String) accessor.getHeader("file_remoteFile");
final int extensionIndex = remoteFileName.lastIndexOf('.');
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSSSS",Locale.GERMAN);
return String.format("%s_MYC(%s-%d)%s",remoteFileName.substring(0,extensionIndex),zoneddatetime.of(LocalDateTime.Now(),ZoneId.of("Europe/Berlin")).format(formatter),(new SecureRandom()).nextInt(99999),remoteFileName.substring(extensionIndex));
}))
.options(Option.PRESERVE_TIMESTAMP)
.remoteFileSeparator("/"))
.channel("getoutputChannel");
}
这是我的 spring-batch tasklet 和 Junit。 MessagingGateway 通过 tasklet 构造函数注入。
@Override
public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception {
for (Endpoint endpoint : endpoints) {
final String remoteDirectory = endpoint.getEpConUri();
logger.info("ProcessRemoteFilesFlowstasklet {} dealer at {} remote files process starting",endpoint.getId().getDlrCd(),remoteDirectory);
flowsMessagingGateway.getFlows(remoteDirectory);
}
return RepeatStatus.FINISHED;
}
@Override
public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception {
controlChannel.send(new Genericmessage<>("@getPoller.start()"));
logger.info("GetPollerRemoteFilesFlowstasklet poller starting...");
return RepeatStatus.FINISHED;
}
@Autowired
private IntegratedRemoteFileProcessMessagingGateway flowsMessagingGateway;
@Autowired
private EndpointRepository endpointRepository;
@Test
public void getFlows() {
flowsMessagingGateway.getFlows("/c07va00011/iris/import/");
Uninterruptibles.sleepUninterruptibly(60,TimeUnit.SECONDS);
}
当我执行 getFlows 测试代码时。我遇到了异常。但文件下载到我的本地计算机。 我不知道。我尝试了很多变体,但没有任何进展。
org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.getFlows.input'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=/c07va00011/iris/import/,headers={replyChannel=org.springframework.messaging.core.GenericmessagingTemplate$TemporaryReplyChannel@2e64ae1a,errorChannel=org.springframework.messaging.core.GenericmessagingTemplate$TemporaryReplyChannel@2e64ae1a,id=bd393cb7-42d0-03b2-674d-40e3cf9211de,timestamp=1609844917799}]
...
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers
at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:139)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 145 common frames omitted
@EnableIntegration 放置了每个与 spring-integration 相关的配置类。 @IntegrationComponentScan 还放置了我的主要流程配置类(带有要扫描的包名称字符串数组)。 如果@EnableIntegration 注解位于多个类中会发生什么?
我是否应该将所有 spring-batch 和 spring-integration 配置类合并为一个?
另外,我测试了 ControlBus(在 spring-batch tasklet 中向轮询器发送消息)并得到相同的异常。
11:57:36.481 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step startGetPollerRemoteFilesstep in job integratedFilesProcessJob2
org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.controlChannel'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=@getPoller.start(),headers={id=539a27d0-9bce-062d-8664-53aae14b5680,timestamp=1609930656454}]
@Lazy,@DependsOn 也不起作用。 (@Lazy 添加到 ControlBus,@DependsOn 添加到 spring 服务类:Spring-batch 作业也通过 rest API 调用手动启动/停止。)
@Autowired
public BatchFileServiceConfiguration(JobBuilderFactory jobBuilderFactory,StepBuilderFactory stepBuilderFactory,PropertyConfiguration propertyConfiguration,@Qualifier("sourceBatchTransactionManager") PlatformTransactionManager sourceBatchTransactionManager,@Qualifier("sourceBatchEntityManagerFactory") EntityManagerFactory sourceBatchEntityManagerFactory,@Qualifier("processFileTaskExecutor") TaskExecutor processFileTaskExecutor,BatchEndpointRepository batchEndpointRepository,RemoteFileProcessMessagingGateway remoteFileProcessMessagingGateway,@Lazy @Qualifier("controlChannel") MessageChannel controlChannel) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.propertyConfiguration = propertyConfiguration;
this.sourceBatchTransactionManager = sourceBatchTransactionManager;
this.sourceBatchEntityManagerFactory = sourceBatchEntityManagerFactory;
this.processFileTaskExecutor = processFileTaskExecutor;
this.batchEndpointRepository = batchEndpointRepository;
this.remoteFileProcessMessagingGateway = remoteFileProcessMessagingGateway;
this.controlChannel = controlChannel;
@Service
@DependsOn({"lsFlows","getFlows","moveFlows","moveFailedFlows","getPollableFlows"})
public class FileServiceImpl implements FileService {
这些异常在 spring-integration 独立应用程序中从未发生过。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)