我的集成流程和 tasklet 有什么问题

问题描述

这是我的流 LS-GET(SFTP 出站网关:从远程 SFTP 服务器下载文件。) 和 MessagingGateway。

    @MessagingGateway
    public interface IntegratedRemoteFileProcessMessagingGateway {
    
      @Gateway(requestChannel = "getFlows.input")
      void getFlows(final String remoteDirectory);
    
      @Gateway(requestChannel = "moveFlows.input")
      void moveFlows(final String remoteDirectory);
   }

    @Bean
    public QueueChannelSpec getoutputChannel() {
        return MessageChannels.queue();
    }
    
    @Bean
    public IntegrationFlow getFlows() {
        return f -> f
                .enrichHeaders(h -> h
                        .headerExpression("originalPayload","payload.toString()")
                        .headerExpression(FileHeaders.REMOTE_DIRECTORY,"payload.toString()"))
                .log(LoggingHandler.Level.INFO,"eu.haee","'Header originalPayload=' + headers[originalPayload]")
                .handle(Sftp.outboundGateway(sessionFactory,Command.LS.getCommand(),"payload")
                        .autocreateDirectory(false)
                        .autocreateLocalDirectory(false)
                        .charset("UTF-8")
                        .filter(new SftpSimplePatternFileListFilter("*.xml"))
                        .options(Option.NAME_ONLY,Option.RECURSIVE))
                .split()
                .log(LoggingHandler.Level.INFO,"'LS Payload= ' + payload.toString()")
                .enrichHeaders(h -> h
                        .headerExpression("originalRemoteFile","payload.toString()")
                        .headerExpression(FileHeaders.REMOTE_FILE,"payload.toString()"))
                .handle(Sftp.outboundGateway(sessionFactory,Command.GET.getCommand(),"headers['originalPayload'] + headers['file_remoteFile']")
                        .autocreateLocalDirectory(false)
                        .charset("UTF-8")
                        .fileNameExpression("headers['file_remoteFile']")
                        .localDirectory(new File(flowsConfiguration.localDirectory()))
                        .localFilenameExpression(new FunctionExpression<Message<?>>(m -> {
                            IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(m);
                            final String remoteFileName = (String) accessor.getHeader("file_remoteFile");
                            final int extensionIndex = remoteFileName.lastIndexOf('.');
                            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss.SSSSS",Locale.GERMAN);
                            return String.format("%s_MYC(%s-%d)%s",remoteFileName.substring(0,extensionIndex),zoneddatetime.of(LocalDateTime.Now(),ZoneId.of("Europe/Berlin")).format(formatter),(new SecureRandom()).nextInt(99999),remoteFileName.substring(extensionIndex));
                        }))
                        .options(Option.PRESERVE_TIMESTAMP)
                        .remoteFileSeparator("/"))
                .channel("getoutputChannel");
    }

这是我的 spring-batch tasklet 和 Junit。 MessagingGateway 通过 tasklet 构造函数注入。

    @Override
    public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception {
        for (Endpoint endpoint : endpoints) {
            final String remoteDirectory = endpoint.getEpConUri();
            logger.info("ProcessRemoteFilesFlowstasklet {} dealer at {} remote files process starting",endpoint.getId().getDlrCd(),remoteDirectory);
            flowsMessagingGateway.getFlows(remoteDirectory);
        }
        return RepeatStatus.FINISHED;
    }
@Override
    public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext) throws Exception {
        controlChannel.send(new Genericmessage<>("@getPoller.start()"));
        logger.info("GetPollerRemoteFilesFlowstasklet poller starting...");
        return RepeatStatus.FINISHED;
    }
@Autowired
    private IntegratedRemoteFileProcessMessagingGateway flowsMessagingGateway;
        
    @Autowired
    private EndpointRepository endpointRepository;
        
    @Test
    public void getFlows() {
        flowsMessagingGateway.getFlows("/c07va00011/iris/import/");
        Uninterruptibles.sleepUninterruptibly(60,TimeUnit.SECONDS);
    }

当我执行 getFlows 测试代码时。我遇到了异常。但文件下载到我的本地计算机。 我不知道。我尝试了很多变体,但没有任何进展。

org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.getFlows.input'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=/c07va00011/iris/import/,headers={replyChannel=org.springframework.messaging.core.GenericmessagingTemplate$TemporaryReplyChannel@2e64ae1a,errorChannel=org.springframework.messaging.core.GenericmessagingTemplate$TemporaryReplyChannel@2e64ae1a,id=bd393cb7-42d0-03b2-674d-40e3cf9211de,timestamp=1609844917799}]
...
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:139)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 145 common frames omitted

@EnableIntegration 放置了每个与 spring-integration 相关的配置类。 @IntegrationComponentScan 还放置了我的主要流程配置类(带有要扫描的包名称字符串数组)。 如果@EnableIntegration 注解位于多个类中会发生什么?

我是否应该将所有 spring-batch 和 spring-integration 配置类合并为一个

另外,我测试了 ControlBus(在 spring-batch tasklet 中向轮询器发送消息)并得到相同的异常。

11:57:36.481 [main] ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step startGetPollerRemoteFilesstep in job integratedFilesProcessJob2
org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.controlChannel'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=@getPoller.start(),headers={id=539a27d0-9bce-062d-8664-53aae14b5680,timestamp=1609930656454}]

@Lazy,@DependsOn 也不起作用。 (@Lazy 添加到 ControlBus,@DependsOn 添加到 spring 服务类:Spring-batch 作业也通过 rest API 调用手动启动/停止。)

@Autowired
    public BatchFileServiceConfiguration(JobBuilderFactory jobBuilderFactory,StepBuilderFactory stepBuilderFactory,PropertyConfiguration propertyConfiguration,@Qualifier("sourceBatchTransactionManager") PlatformTransactionManager sourceBatchTransactionManager,@Qualifier("sourceBatchEntityManagerFactory") EntityManagerFactory sourceBatchEntityManagerFactory,@Qualifier("processFileTaskExecutor") TaskExecutor processFileTaskExecutor,BatchEndpointRepository batchEndpointRepository,RemoteFileProcessMessagingGateway remoteFileProcessMessagingGateway,@Lazy @Qualifier("controlChannel") MessageChannel controlChannel) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.propertyConfiguration = propertyConfiguration;
        this.sourceBatchTransactionManager = sourceBatchTransactionManager;
        this.sourceBatchEntityManagerFactory = sourceBatchEntityManagerFactory;
        this.processFileTaskExecutor = processFileTaskExecutor;
        this.batchEndpointRepository = batchEndpointRepository;
        this.remoteFileProcessMessagingGateway = remoteFileProcessMessagingGateway;
        this.controlChannel = controlChannel;

@Service
@DependsOn({"lsFlows","getFlows","moveFlows","moveFailedFlows","getPollableFlows"})
public class FileServiceImpl implements FileService {

这些异常在 spring-integration 独立应用程序中从未发生过。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...