问题描述
我发现自己处于一个已经提供了Rabbitmq作为给定基础结构的环境中。
A 服务A 正在写入兔子队列,而服务B 正在从队列中读取。通过密集阅读documentation并正确设置后,通过 Spring Cloud Stream Binder 作为消费者的阅读部分就像灵芝一样工作。
但是我无法设置要写入兔子队列的生产者。
设置
RabbitMq(已启动并运行)
- 1个交易所:myExchange
- 3个队列:myQueueA,myQueueB,myQueueC(在 myExchange 下链接)
生产者服务(上文提到的服务A )
@Slf4j
@Component
@requiredArgsConstructor
@EnableBinding(RabbitChannelSource.class)
public class RabbitSender
{
private final RabbitChannelSource rabbitChannelSource;
public void sendMessage()
{
Message<String> msg = MessageBuilder.withPayload("TEEEEST").build();
rabbitChannelSource.myOutput().send(msg);
}
public interface RabbitChannelSource
{
String MY_OUTPUT_BINDING = "my-output";
@Output(MY_OUTPUT_BINDING)
MessageChannel myOutput();
}
我试图使其首先在一个队列中工作,但理想情况下,我将为所有三个队列设置正确的属性。
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=user
spring.rabbitmq.password=pass
## use existing rabbitmq via bindings
spring.cloud.stream.bindings.my-output.destination=my-output
#spring.cloud.stream.bindings.my-output.group=myQueueA
spring.cloud.stream.bindings.my-output.producer.required-groups=myQueueA
spring.cloud.stream.rabbit.bindings.my-output.producer.bind-queue=false
spring.cloud.stream.rabbit.bindings.my-output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.my-output.producer.queueNameGroupOnly=true
bind-queue=false
和declare-exchange=false
是必需的,因为我拥有兔子基础结构。
但是,尽管我总是遇到同样的异常,但我不知道为什么。我的意思是我知道为什么,因为没有适当的通道来向其发送消息。因此,我怀疑这与application.properties
有关。
org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.my-output'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=byte[7],headers={contentType=application/json,id=98698b4c-61fa-596d-736e-f630d3ba4626,timestamp=1605105272723}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at de.techem.emsreceiver.rabbitmq.RabbitSender.sendMessage(RabbitSender.java:25)
at de.techem.emsreceiver.event.TriggeredEmsImport.execute(TriggeredImport.java:95)
at de.techem.emsreceiver.event.TriggeredEmsImport$$FastClassBySpringcglib$$d84f31a4.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.invokeJoinpoint(cglibAopProxy.java:769)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.proceed(cglibAopProxy.java:747)
at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.proceed(cglibAopProxy.java:747)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.proceed(cglibAopProxy.java:747)
at org.springframework.aop.framework.cglibAopProxy$DynamicAdvisedInterceptor.intercept(cglibAopProxy.java:689)
at de.techem.emsreceiver.event.TriggeredEmsImport$$EnhancerBySpringcglib$$a0878aed.execute(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:305)
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:153)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:403)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:409)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360)
at org.springframework.boot.context.event.EventPublishingRunListener.running(EventPublishingRunListener.java:103)
at org.springframework.boot.SpringApplicationRunListeners.running(SpringApplicationRunListeners.java:77)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:330)
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:140)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:320)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:209)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:140)
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:379)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:268)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
at de.techem.emsreceiver.EmsReceiverApplication.main(EmsReceiverApplication.java:21)
我希望能够基于routingKey写入myQueueA,myQueueB or myQueueC
。因此,如果我将应用程序内部的routingKey设置为myQA
,则消息应发送到myQueueA
,先发送myQB
,再发送到myQueueB
,再发送myQC
,然后发送到{ {1}}。
因此,三个不同的路由键会导致相应的兔子队列。
我很乐意提供任何帮助,因为我尝试了很多没有成功的事情。谢谢!
解决方法
也许您正在尝试在绑定之前使用绑定?这对我来说很好:
spring.cloud.stream.bindings.output.destination=my-output
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
@SpringBootApplication
@EnableBinding(Source.class)
public class So64788954Application {
public static void main(String[] args) {
SpringApplication.run(So64788954Application.class,args);
}
@Autowired
Source source;
@Bean
ApplicationRunner runner() {
return args -> {
source.output().send(MessageBuilder.withPayload("foo")
.setHeader("routeTo","myQA")
.build());
source.output().send(MessageBuilder.withPayload("bar")
.setHeader("routeTo","myQB")
.build());
source.output().send(MessageBuilder.withPayload("baz")
.setHeader("routeTo","myQC")
.build());
};
}
@RabbitListener(queues = { "myQueueA","myQueueB","myQueueC" } )
void listen(String in,@Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println(in + ",from: " + queue);
}
}
foo,from: myQueueA
bar,from: myQueueB
baz,from: myQueueC
带上自己的队列/交换时,生产者端不需要必需的组。
The
@ RabbitListener`可以使用发送到3个队列的消息。
顺便说一句,不建议使用注释模型;现在首选函数式编程模型,其中我们使用StreamBridge
进行输出(而Consumer<?>
或Function<?,?>
分别进行消耗和处理)。
在此等同于以上内容:
spring.cloud.stream.rabbit.bindings.my-output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.my-output.producer.routing-key-expression=headers['routeTo']
@SpringBootApplication
public class So64788954Application {
public static void main(String[] args) {
SpringApplication.run(So64788954Application.class,args);
}
@Autowired
StreamBridge bridge;
@Bean
ApplicationRunner runner() {
return args -> {
bridge.send("my-output",MessageBuilder.withPayload("foo")
.setHeader("routeTo","myQA")
.build());
bridge.send("my-output",MessageBuilder.withPayload("bar")
.setHeader("routeTo","myQB")
.build());
bridge.send("my-output",MessageBuilder.withPayload("baz")
.setHeader("routeTo",from: " + queue);
}
}
,
在加里(Gary)的建议下,我遇到了与我第一篇文章相同的错误。
application.properties
# --- spring cloud ---
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=username
spring.rabbitmq.password=password
## use existing rabbitmq via bindings
spring.cloud.stream.bindings.output.destination=my-output
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
RabbitTrigger.java
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitTrigger
{
private final RabbitSender rabbitSender;
@EventListener(ApplicationReadyEvent.class)
public void execute()
{
rabbitSender.testSending();
}
}
RabbitSender.java
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableBinding(Source.class)
public class RabbitSender
{
private final Source source;
public void testSending()
{
Message<String> testMessage = MessageBuilder.withPayload("TEEEEST")
.setHeader("routeTo","CRE")
.build();
source.output().send(testMessage);
}
但是我遇到的异常与我最初发布的帖子相同。我认为我没有考虑这一点的关键部分?
2020-11-25 16:27:17,116 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers,failedMessage=GenericMessage [payload=byte[7],headers={routeTo=CRE,id=15479810-375f-38e8-a692-e84c6ede01d7,contentType=application/json,timestamp=1606318037108}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at de.techem.emsreceiver.rabbitmq.RabbitSender.testSending(RabbitSender.java:34)
at de.techem.emsreceiver.event.RabbitTrigger.execute(RabbitTrigger.java:96)
at de.techem.emsreceiver.event.RabbitTrigger$$FastClassBySpringCGLIB$$d84f31a4.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at de.techem.emsreceiver.event.RabbitTrigger$$EnhancerBySpringCGLIB$$d00eff49.execute(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:305)
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:153)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:403)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:409)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360)
at org.springframework.boot.context.event.EventPublishingRunListener.running(EventPublishingRunListener.java:103)
at org.springframework.boot.SpringApplicationRunListeners.running(SpringApplicationRunListeners.java:77)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:330)
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:140)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:320)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:209)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:140)
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:379)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:268)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
at de.techem.emsreceiver.EmsReceiverApplication.main(EmsReceiverApplication.java:21)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 60 common frames omitted
对我来说,建立生产者比在消费者方面要复杂得多。