问题描述
我正在使用 spring-amqp consumerBatchEnabled 批量处理消息。对于错误处理和重试,使用带有恢复器功能的弹簧重试退避的死信交换。
如果在主队列中处理批处理期间出现任何异常,我将拒绝整个批处理,然后死信队列侦听器单独处理消息。我已经为 DLQ 侦听器配置了 Advice 链以进行重试。想法是在丢弃消息之前在 DLQ 侦听器中进行固定次数的重试。
重试功能工作正常,恢复器也被调用,但消息在死信队列中保持未确认状态。根据我的理解,在 AmqpRejectAndDontRequeueException 的情况下,应该从队列中删除消息,但它似乎没有发生,我在这里遗漏了什么。 (如果我附加了 dlx,这也不起作用)
(使用默认侦听器容器工厂和 application.yml 中的重试属性,这可以按预期工作,但不适用于自定义侦听器容器工厂)
下面是测试代码:
配置类
package com.example.demo;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@EnableRabbit
@Configuration
public class RabbitMQConfig {
@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}
@Bean
DirectExchange exchange() {
return new DirectExchange("person.exchange");
}
@Bean
Queue dlq() {
return QueueBuilder.durable("deadLetter.queue").build();
}
@Bean
Queue queue() {
return QueueBuilder.durable("person.queue").withArgument("x-dead-letter-exchange","deadLetterExchange")
.withArgument("x-dead-letter-routing-key","deadLetter").build();
}
@Bean
Binding DLQbinding() {
return BindingBuilder.bind(dlq()).to(deadLetterExchange()).with("deadLetter");
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("person.key");
}
@Bean
@Autowired
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
String rabbitmq_host = "localhost";
String rabbitmq_port = "5672";
String rabbitmq_user = "guest";
String rabbitmq_pwd = "guest";
factory.setHost(rabbitmq_host);
factory.setPort(Integer.parseInt(rabbitmq_port));
factory.setUsername(rabbitmq_user);
factory.setPassword(rabbitmq_pwd);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAckNowledgeMode(AckNowledgeMode.MANUAL);
factory.setPrefetchCount(1000);
factory.setBatchListener(true);
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
factory.setReceiveTimeout(1000l);
configurer.configure(factory,connectionFactory);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitDlqListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAckNowledgeMode(AckNowledgeMode.MANUAL);
factory.setConcurrentConsumers(1);
factory.setPrefetchCount(1000);
factory.setDefaultRequeueRejected(true);
factory.setAdviceChain(RetryInterceptorBuilder.stateless().backOffOptions(
2000,1,10000).maxAttempts(3).
recoverer(new RejectAndDontRequeueRecoverer()).build());
configurer.configure(factory,connectionFactory);
return factory;
}
}
听众
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Component
public class RabbitMQConsumer {
List<String> list = new ArrayList<>();
private int i = 0;
@RabbitListener(id = "myRabbitListener",queues = "person.queue",containerFactory = "rabbitListenerContainerFactory")
public void getMessage(List<Message<Person>> messages,Channel channel) throws InvalidPersonException,IOException {
System.out.println(messages.size());
long tag = (long)messages.get(messages.size()-1).getHeaders().get((AmqpHeaders.DELIVERY_TAG));
channel.basicNack(tag,true,false);
/* System.out.println(Thread.currentThread());
if(message.getPayload().getName().equals("test")) {
channel.basicNack(tag,false,false);
}*/
}
}
package com.example.demo;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.messaging.handler.annotation.Header;
import static java.util.concurrent.TimeUnit.SECONDS;
@Component
public class DlqConsumer {
List<String> list = new ArrayList<>();
private int i = 0;
@RabbitListener(id = "myDlqListener",queues = "deadLetter.queue",containerFactory = "rabbitDlqListenerContainerFactory")
public void getMessage(Message<Person> message,Channel channel) throws InvalidPersonException {
System.out.println(message.getPayload().getName());
long tag = (long)message.getHeaders().get((AmqpHeaders.DELIVERY_TAG));
System.out.println(Thread.currentThread());
throw new InvalidPersonException();
}
}
异常类:
package com.example.demo;
public class InvalidPersonException extends Exception{
private static final long serialVersionUID = -3154618962130084535L;
}
package com.example.demo;
import java.io.Serializable;
public class Person implements Serializable {
private Long Id;
private String name;
public Person(){
}
public Person(Long id,String name) {
super();
Id = id;
this.name = name;
}
public Long getId() {
return Id;
}
public void setId(Long id) {
Id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
build.gradle
buildscript {
ext {
springBootVersion = '2.2.10.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.test'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
implementation 'junit:junit:4.12'
compile('org.springframework.boot:spring-boot-starter-amqp:2.4.8')
compile('org.springframework.boot:spring-boot-starter-web:2.2.10.RELEASE')
testCompile('org.springframework.boot:spring-boot-starter-test')
compile "io.springfox:springfox-swagger2:2.7.0"
compile "io.springfox:springfox-swagger-ui:2.7.0"
testCompile "org.springframework.amqp:spring-rabbit-test:2.3.6"
}
解决方法
<assembly>
<id>db-delivery-zip</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<files>
<file>
<source>db-delivery-info.txt</source>
</file>
</files>
</assembly>
当您进行手动确认时,您应对所有确认/确认负责。
使用默认模式 (factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
)。