问题描述
我有一个用例,我需要在运行时创建新队列,并为那些新创建的队列创建消费者。在运行时创建的队列应该是独占的,并在到期时自动删除。我遵循了建议的模式 - here 如果我将它们声明为独占和自动删除,则没有任何 x-expires
参数它可以工作。但是,如果我设置了它,每当应用程序尝试在运行时创建新队列时,我都会在控制台中看到一条错误消息。看起来参数名称是错误的,或者可能不是 spring 内部所期望的。只是看看如何设置到期时间。
以下是我的课程:
import lombok.requiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import java.util.HashMap;
import java.util.Map;
import static org.springframework.amqp.support.AmqpHeaders.CONSUMER_QUEUE;
@Configuration
@EnableRabbit
@Slf4j
@requiredArgsConstructor
public class PrintJobListenerConfiguration
{
@RabbitListener(queues = "${queue.name}")
public void listen(@Header(CONSUMER_QUEUE) String queue) {
log.info("Message read from the queue : {}",queue);
}
@Bean
public Queue queue(@Value("${queue.name}") String name) {
Map arg = new HashMap<>();
arg.put("x-expires","20000");
return new Queue(name,true,arg);
}
@Bean
DirectExchange exchange(@Value("${exchange.name}") String exchange) {
return new DirectExchange(exchange);
}
@Bean
Binding binding(
@Value("${routing.key}") String routingkey,Queue queue,DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingkey);
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
}
=========================================================================================
import com.mm.alchemy.dynamicqueue.PrintJobListenerConfiguration;
import com.mm.alchemy.print.properties.ApplicationProperties;
import lombok.requiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Slf4j
@Service
@requiredArgsConstructor
public class DynamicQueueListenerService {
private final Map<String,ConfigurableApplicationContext> children = new HashMap<>();
private final ApplicationContext context;
private final ApplicationProperties applicationProperties;
public void addNewDynamicQueueAndListener(String queue) {
children.put(queue,addNewListener(queue));
}
private ConfigurableApplicationContext addNewListener(String queue) {
AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
child.setParent(context);
ConfigurableEnvironment environment = child.getEnvironment();
Properties properties = new Properties();
properties.setProperty("queue.name",queue);
properties.setProperty(
"exchange.name",applicationProperties.getConsumer().getPrintJobRequestExchangeName());
properties.setProperty("routing.key",queue);
PropertiesPropertySource pps = new PropertiesPropertySource("props",properties);
environment.getPropertySources().addLast(pps);
child.register(PrintJobListenerConfiguration.class);
child.refresh();
return child;
}
}
尝试在运行时创建队列时的失败堆栈跟踪:
2021-05-05 20:28:32.096 INFO 96868 --- [ restartedMain] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable,auto-delete,or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true,auto-delete:true,exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive,but all messages will be lost.
2021-05-05 20:28:32.119 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,reply-text=PRECONDITION_Failed - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr},class-id=50,method-id=10)
2021-05-05 20:28:32.139 INFO 96868 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729
2021-05-05 20:28:33.143 INFO 96868 --- [ restartedMain] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable,but all messages will be lost.
2021-05-05 20:28:33.150 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,method-id=10)
2021-05-05 20:28:35.157 INFO 96868 --- [ restartedMain] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable,but all messages will be lost.
2021-05-05 20:28:35.165 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,method-id=10)
2021-05-05 20:28:39.171 INFO 96868 --- [ restartedMain] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable,but all messages will be lost.
2021-05-05 20:28:39.178 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,method-id=10)
2021-05-05 20:28:44.182 INFO 96868 --- [ restartedMain] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable,but all messages will be lost.
2021-05-05 20:28:44.189 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406,method-id=10)
2021-05-05 20:28:44.193 INFO 96868 --- [ restartedMain] o.s.a.r.l.SimpleMessageListenerContainer : broker not available; cannot force queue declarations during start: java.io.IOException
2021-05-05 20:28:44.205 WARN 96868 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414
2021-05-05 20:28:44.209 WARN 96868 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration Failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:733) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:608) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:595) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1347) ~[spring-rabbit-2.3.6.jar:2.3.6]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1192) ~[spring-rabbit-2.3.6.jar:2.3.6]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:46) ~[amqp-client-5.10.0.jar:5.10.0]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157) ~[spring-rabbit-2.3.6.jar:2.3.6]
at com.sun.proxy.$Proxy83.queueDeclarePassive(UnkNown Source) ~[na:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.3.6.jar:2.3.6]
... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404,reply-text=NOT_FOUND - no queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/',method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.10.0.jar:5.10.0]
... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404,method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-5.10.0.jar:5.10.0]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666) ~[amqp-client-5.10.0.jar:5.10.0]
... 1 common frames omitted
解决方法
arg.put("x-expires","20000");
reply-text=PRECONDITION_FAILED - 无效参数 'x-expires'
x-expires
是一个整数参数,而不是一个字符串。
我建议使用具有类型安全方法的 QueueBuilder
...
@Bean
Queue queue() {
return QueueBuilder.durable("queue")
.autoDelete()
.exclusive()
.expires(20000)
.build();
}