在运行时创建和绑定独占和自动删除 rabbitmq 队列,并定义了到期时间失败

问题描述

我有一个用例,我需要在运行时创建新队列,并为那些新创建的队列创建消费者。在运行时创建的队列应该是独占的,并在到期时自动删除。我遵循了建议的模式 - here 如果我将它们声明为独占和自动删除,则没有任何 x-expires 参数它可以工作。但是,如果我设置了它,每当应用程序尝试在运行时创建新队列时,我都会在控制台中看到一条错误消息。看起来参数名称错误的,或者可能不是 spring 内部所期望的。只是看看如何设置到期时间。 以下是我的课程:

import lombok.requiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;

import java.util.HashMap;
import java.util.Map;

import static org.springframework.amqp.support.AmqpHeaders.CONSUMER_QUEUE;

@Configuration
@EnableRabbit
@Slf4j
@requiredArgsConstructor
public class PrintJobListenerConfiguration
{

  @RabbitListener(queues = "${queue.name}")
  public void listen(@Header(CONSUMER_QUEUE) String queue) {
    log.info("Message read from the queue : {}",queue);
  }

  @Bean
  public Queue queue(@Value("${queue.name}") String name) {
    Map arg = new HashMap<>();
    arg.put("x-expires","20000");
    return new Queue(name,true,arg);
  }

  @Bean
  DirectExchange exchange(@Value("${exchange.name}") String exchange) {
    return new DirectExchange(exchange);
  }

  @Bean
  Binding binding(
      @Value("${routing.key}") String routingkey,Queue queue,DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingkey);
  }

  @Bean
  public RabbitAdmin admin(ConnectionFactory cf) {
    return new RabbitAdmin(cf);
  }
}

=========================================================================================

import com.mm.alchemy.dynamicqueue.PrintJobListenerConfiguration;
import com.mm.alchemy.print.properties.ApplicationProperties;
import lombok.requiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Slf4j
@Service
@requiredArgsConstructor
public class DynamicQueueListenerService {
  private final Map<String,ConfigurableApplicationContext> children = new HashMap<>();
  private final ApplicationContext context;
  private final ApplicationProperties applicationProperties;

  public void addNewDynamicQueueAndListener(String queue) {
    children.put(queue,addNewListener(queue));
  }

  private ConfigurableApplicationContext addNewListener(String queue) {
    AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
    child.setParent(context);
    ConfigurableEnvironment environment = child.getEnvironment();
    Properties properties = new Properties();
    properties.setProperty("queue.name",queue);
    properties.setProperty(
        "exchange.name",applicationProperties.getConsumer().getPrintJobRequestExchangeName());
    properties.setProperty("routing.key",queue);
    PropertiesPropertySource pps = new PropertiesPropertySource("props",properties);
    environment.getPropertySources().addLast(pps);
    child.register(PrintJobListenerConfiguration.class);
    child.refresh();
    return child;
  }
}


尝试在运行时创建队列时的失败堆栈跟踪:

2021-05-05 20:28:32.096  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable,auto-delete,or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true,auto-delete:true,exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive,but all messages will be lost.
2021-05-05 20:28:32.119 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,reply-text=PRECONDITION_Failed - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr},class-id=50,method-id=10)
2021-05-05 20:28:32.139  INFO 96868 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2021-05-05 20:28:33.143  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable,but all messages will be lost.
2021-05-05 20:28:33.150 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,method-id=10)
2021-05-05 20:28:35.157  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable,but all messages will be lost.
2021-05-05 20:28:35.165 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,method-id=10)
2021-05-05 20:28:39.171  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable,but all messages will be lost.
2021-05-05 20:28:39.178 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,method-id=10)
2021-05-05 20:28:44.182  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable,but all messages will be lost.
2021-05-05 20:28:44.189 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,method-id=10)
2021-05-05 20:28:44.193  INFO 96868 --- [  restartedMain] o.s.a.r.l.SimpleMessageListenerContainer : broker not available; cannot force queue declarations during start: java.io.IOException
2021-05-05 20:28:44.205  WARN 96868 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414
2021-05-05 20:28:44.209  WARN 96868 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration Failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:733) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:608) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:595) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1347) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1192) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:46) ~[amqp-client-5.10.0.jar:5.10.0]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at com.sun.proxy.$Proxy83.queueDeclarePassive(UnkNown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.3.6.jar:2.3.6]
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404,reply-text=NOT_FOUND - no queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/',method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.10.0.jar:5.10.0]
    ... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404,method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666) ~[amqp-client-5.10.0.jar:5.10.0]
    ... 1 common frames omitted

解决方法

arg.put("x-expires","20000");

reply-text=PRECONDITION_FAILED - 无效参数 'x-expires'

x-expires 是一个整数参数,而不是一个字符串。

我建议使用具有类型安全方法的 QueueBuilder...

@Bean
Queue queue() {
    return QueueBuilder.durable("queue")
            .autoDelete()
            .exclusive()
            .expires(20000)
            .build();
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...