如何自己手动停止或启动@InboundChannelAdapter?

问题描述

大家好,

我正在研究 Spring Cloud 数据流

我创建了该应用程序,它运行良好,但我们有一个要求,例如它可以在我们需要时随时启动/停止..

如果我保持 autoStartup="false" 一开始它不会启动,但我不知道如何启动或停止之后。

大多数地方只有 xml 配置。

用一些在线文章尝试了一些代码,但没有奏效。

谁能知道如何解决这个问题,如果有任何例子意味着它会非常有帮助。

实际上,如果 autoStartup 为 false 并且使用 CommandLineRunner 我可以启动该服务。但是如果我尝试使用 rest 端点同样会抛出错误

下面是代码片段。

下面的代码片段来自我的代码

package com.javatechie.service;

import com.javatechie.impl.ProductBuilder;
import com.javatechie.TbeSource;
import com.javatechie.model.Product;
import com.javatechie.stopping.StopPollingAdvice;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.integration.annotation.EndpointId;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.ExpressionControlBusfactorybean;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.Genericmessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;
import java.util.function.supplier;

@Component
@EnableIntegration
@RestController
public class ProcessingCl {

    @Autowired
    ApplicationContext ac;


    @Bean
    @Scope("singleton")
    private ProductBuilder dataAccess() {
        return new ProductBuilder();
    }


    @Bean
    @EndpointId("inboundtest")
    @InboundChannelAdapter(channel = TbeSource.PR1,poller = @Poller(fixedDelay = "100",errorChannel = "errorchannel"),autoStartup = "false")
    public supplier<Product> getProductSource(ProductBuilder dataAccess) 
     {
     return ()->dataAccesss.getNext();
    }

    @Bean
    MessageChannel controlChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "controlChannel")
    ExpressionControlBusfactorybean controlBus() {
        ExpressionControlBusfactorybean expressionControlBusfactorybean = new ExpressionControlBusfactorybean();
        return expressionControlBusfactorybean;
    }

    @Bean
    CommandLineRunner commandLineRunner(@Qualifier("controlChannel") MessageChannel controlChannel) {
        return (String[] args) -> {
            System.out.println("Starting incoming file adapter: ");
            boolean sent = controlChannel.send(new Genericmessage<>("@inboundtest.start()"));
            System.out.println("Sent control message successfully? " + sent);
            while (system.in.available() == 0) {
                Thread.sleep(50);
            }
        };
    }

    @GetMapping("/stop11")
    void test() {
        controlChannel().send(new Genericmessage<>("@inboundtest.stop()"));
        System.out.println("it is in call method to stop");
    }


    @GetMapping("/start11")
    void test1() {
        controlChannel().send(new Genericmessage<>("@inboundtest.start()"));
        System.out.println("it is in call method to start");
    }
  

}

以下是错误信息。

Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing Failed; nested exception is org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'unkNown.channel.name'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=@inboundtest.start(),headers={id=82c74865-2f68-c192-7d48-501bf3b28e02,timestamp=1608643036650}],timestamp=1608643036650}]] with root cause

org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers

解决方法

控制总线、@EndpointId 和命令为 @inboundtest.start() 是要走的路。

唯一的问题是您没有显示该 TbeSource.PR1 频道的订阅者。 这可能是您错过 Spring Integration 的地方:您开始从轮询通道适配器生成消息,但没有像服务激活器那样将 TbeSource.PR1 通道作为输入。

我看到您说它是数据流,但从您的描述和配置中不清楚 TbeSource.PR1 目的地的绑定在哪里。您是否有类似 @EnableBinding 和相应的 Source 定义?