问题描述
我们正在使用新的 Spring Cloud 函数库创建 rabbitMq 使用者。
然而,我们发现在应用程序启动时,我们没有看到在 rabbitMq 实例上创建的队列或交换。
这是我们的配置。
spring:
cloud:
function:
deFinition: someReceiver
stream:
binders:
rabbit:
type: rabbit
bindings:
someReceiver-in-0:
consumer:
max-attemps: 1
batch-mode: true
binder: rabbit
destination: someExhange
group: someQueue
default-binder: rabbit
rabbit:
bindings:
someReceiver-in-0:
consumer:
ackNowledge-mode: MANUAL
auto-bind-dlq: true
queue-name-group-only: true
exchange-type: topic
max-concurrency: 10
prefetch: 200
enable-batching: true
batch-size: 10
receive-timeout: 200
dlq-dead-letter-exchange:
这是我们的消费者。
@Bean
public Consumer<Message<Long>> someReceiver() {
return ....
}
在日志中我们可以看到:
o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel nullChannel
o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler _org.springframework.integration.errorLogger
o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'X' has 1 subscriber(s).
我们遇到的问题是在应用程序启动时没有在 rabbitMq 代理上创建队列或交换。
我们期望在应用程序启动时应该创建一个名为 someQueue
的队列和一个名为 someExchange
的交换器
解决方法
直接从 https://start.spring.io 中按设计工作:
2021-04-19 12:02:07.476 INFO 28260 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.someReceiver-in-0' has 1 subscriber(s).
2021-04-19 12:02:07.563 INFO 28260 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
2021-04-19 12:02:07.595 INFO 28260 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel nullChannel
2021-04-19 12:02:07.600 INFO 28260 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel someReceiver-in-0
2021-04-19 12:02:07.613 INFO 28260 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler _org.springframework.integration.errorLogger
2021-04-19 12:02:07.631 INFO 28260 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-19 12:02:07.631 INFO 28260 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2021-04-19 12:02:07.631 INFO 28260 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-04-19 12:02:07.632 INFO 28260 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: rabbit
2021-04-19 12:02:07.740 INFO 28260 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: rabbit
2021-04-19 12:02:07.740 INFO 28260 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: rabbit
2021-04-19 12:02:07.792 INFO 28260 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: someQueue,bound to: someExhange
2021-04-19 12:02:07.793 INFO 28260 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2021-04-19 12:02:07.938 INFO 28260 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#244e619a:0/SimpleConnection@73a0f2b [delegate=amqp://guest@127.0.0.1:5672/,localPort= 51143]
2021-04-19 12:02:07.991 INFO 28260 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'someQueue.errors' has 1 subscriber(s).
2021-04-19 12:02:07.991 INFO 28260 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'someQueue.errors' has 2 subscriber(s).
2021-04-19 12:02:08.002 INFO 28260 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started bean 'inbound.someQueue'
2021-04-19 12:02:08.010 INFO 28260 --- [ main] o.s.s.c.s.s.So67160902Application : Started So67160902Application in 1.659 seconds (JVM running for 2.147)
我在 Rabbit MQ 管理控制台中看到了这一点:
您可能在配置中遗漏了某些依赖项或执行了其他操作,以阻止 RabbitMQ Binder 执行其操作。
这是我的 deps,在刚从 https://start.spring.io 生成的 pom 中:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
</dependencies>