问题描述
大家好,
我正在研究 Spring Cloud 数据流
我创建了该应用程序,它运行良好,但我们有一个要求,例如它可以在我们需要时随时启动/停止..
如果我保持 autoStartup="false" 一开始它不会启动,但我不知道如何启动或停止之后。
大多数地方只有 xml 配置。
谁能知道如何解决这个问题,如果有任何例子意味着它会非常有帮助。
实际上,如果 autoStartup 为 false 并且使用 CommandLineRunner 我可以启动该服务。但是如果我尝试使用 rest 端点同样会抛出错误。
下面是代码片段。
package com.javatechie.service;
import com.javatechie.impl.ProductBuilder;
import com.javatechie.TbeSource;
import com.javatechie.model.Product;
import com.javatechie.stopping.StopPollingAdvice;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
import org.springframework.integration.annotation.EndpointId;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.ExpressionControlBusfactorybean;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.Genericmessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.List;
import java.util.function.supplier;
@Component
@EnableIntegration
@RestController
public class ProcessingCl {
@Autowired
ApplicationContext ac;
@Bean
@Scope("singleton")
private ProductBuilder dataAccess() {
return new ProductBuilder();
}
@Bean
@EndpointId("inboundtest")
@InboundChannelAdapter(channel = TbeSource.PR1,poller = @Poller(fixedDelay = "100",errorChannel = "errorchannel"),autoStartup = "false")
public supplier<Product> getProductSource(ProductBuilder dataAccess)
{
return ()->dataAccesss.getNext();
}
@Bean
MessageChannel controlChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
ExpressionControlBusfactorybean controlBus() {
ExpressionControlBusfactorybean expressionControlBusfactorybean = new ExpressionControlBusfactorybean();
return expressionControlBusfactorybean;
}
@Bean
CommandLineRunner commandLineRunner(@Qualifier("controlChannel") MessageChannel controlChannel) {
return (String[] args) -> {
System.out.println("Starting incoming file adapter: ");
boolean sent = controlChannel.send(new Genericmessage<>("@inboundtest.start()"));
System.out.println("Sent control message successfully? " + sent);
while (system.in.available() == 0) {
Thread.sleep(50);
}
};
}
@GetMapping("/stop11")
void test() {
controlChannel().send(new Genericmessage<>("@inboundtest.stop()"));
System.out.println("it is in call method to stop");
}
@GetMapping("/start11")
void test1() {
controlChannel().send(new Genericmessage<>("@inboundtest.start()"));
System.out.println("it is in call method to start");
}
}
以下是错误信息。
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing Failed; nested exception is org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'unkNown.channel.name'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=@inboundtest.start(),headers={id=82c74865-2f68-c192-7d48-501bf3b28e02,timestamp=1608643036650}],timestamp=1608643036650}]] with root cause
org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers
解决方法
控制总线、@EndpointId
和命令为 @inboundtest.start()
是要走的路。
唯一的问题是您没有显示该 TbeSource.PR1
频道的订阅者。
这可能是您错过 Spring Integration 的地方:您开始从轮询通道适配器生成消息,但没有像服务激活器那样将 TbeSource.PR1
通道作为输入。
我看到您说它是数据流,但从您的描述和配置中不清楚 TbeSource.PR1
目的地的绑定在哪里。您是否有类似 @EnableBinding
和相应的 Source
定义?