使用 spring cloud 功能和 rabbitmq 自动创建队列交换

问题描述

我们正在使用新的 Spring Cloud 函数库创建 rabbitMq 使用者。

然而,我们发现在应用程序启动时,我们没有看到在 rabbitMq 实例上创建的队列或交换。

这是我们的配置。

spring:
  cloud:
    function:
      deFinition: someReceiver
    stream:
      binders:
        rabbit:
          type: rabbit
      bindings:
        someReceiver-in-0:
          consumer:
            max-attemps: 1
            batch-mode: true
          binder: rabbit
          destination: someExhange
          group: someQueue
      default-binder: rabbit
      rabbit:
        bindings:
          someReceiver-in-0:
            consumer:
              ackNowledge-mode: MANUAL
              auto-bind-dlq: true
              queue-name-group-only: true
              exchange-type: topic
              max-concurrency: 10
              prefetch: 200
              enable-batching: true
              batch-size: 10
              receive-timeout: 200
              dlq-dead-letter-exchange:

这是我们的消费者。

 @Bean
    public Consumer<Message<Long>> someReceiver() {
        return ....
    }

在日志中我们可以看到:

o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel errorChannel
o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel nullChannel
o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler _org.springframework.integration.errorLogger
o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel    : Channel 'X' has 1 subscriber(s).

我们遇到的问题是在应用程序启动时没有在 rabbitMq 代理上创建队列或交换。 我们期望在应用程序启动时应该创建一个名为 someQueue 的队列和一个名为 someExchange 的交换器

解决方法

直接从 https://start.spring.io 中按设计工作:

2021-04-19 12:02:07.476  INFO 28260 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.someReceiver-in-0' has 1 subscriber(s).
2021-04-19 12:02:07.563  INFO 28260 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel errorChannel
2021-04-19 12:02:07.595  INFO 28260 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel nullChannel
2021-04-19 12:02:07.600  INFO 28260 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel someReceiver-in-0
2021-04-19 12:02:07.613  INFO 28260 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler _org.springframework.integration.errorLogger
2021-04-19 12:02:07.631  INFO 28260 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-04-19 12:02:07.631  INFO 28260 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2021-04-19 12:02:07.631  INFO 28260 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-04-19 12:02:07.632  INFO 28260 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: rabbit
2021-04-19 12:02:07.740  INFO 28260 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: rabbit
2021-04-19 12:02:07.740  INFO 28260 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: rabbit
2021-04-19 12:02:07.792  INFO 28260 --- [           main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: someQueue,bound to: someExhange
2021-04-19 12:02:07.793  INFO 28260 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-04-19 12:02:07.938  INFO 28260 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#244e619a:0/SimpleConnection@73a0f2b [delegate=amqp://guest@127.0.0.1:5672/,localPort= 51143]
2021-04-19 12:02:07.991  INFO 28260 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'someQueue.errors' has 1 subscriber(s).
2021-04-19 12:02:07.991  INFO 28260 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'someQueue.errors' has 2 subscriber(s).
2021-04-19 12:02:08.002  INFO 28260 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started bean 'inbound.someQueue'
2021-04-19 12:02:08.010  INFO 28260 --- [           main] o.s.s.c.s.s.So67160902Application        : Started So67160902Application in 1.659 seconds (JVM running for 2.147)

我在 Rabbit MQ 管理控制台中看到了这一点:

enter image description here

您可能在配置中遗漏了某些依赖项或执行了其他操作,以阻止 RabbitMQ Binder 执行其操作。

这是我的 deps,在刚从 https://start.spring.io 生成的 pom 中:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <scope>test</scope>
        <classifier>test-binder</classifier>
        <type>test-jar</type>
    </dependency>
</dependencies>