问题描述
鉴于我有IntegrationFlow
IntegrationFlows.from(
Amqp.inboundAdapter(rabbitConnectionFactory,QUEUE)
.messageConverter(new MarshallingMessageConverter(xmlMarshaller))
.defaultRequeueRejected(false)
.concurrentConsumers(2)
.maxConcurrentConsumers(4)
.channelTransacted(true)
.errorHandler(new ConditionalRejectingErrorHandler())
)
.log(INFO,AMQP_LOGGER_CATEGORY)
.publishSubscribeChannel(s -> s
.subscribe(f -> f
.handle(deathCheckHandler))
.subscribe(f -> f.handle(service))
)
.get();
deathCheckHandler
在哪里
@Component
public class DeathCheckHandler {
private static final Logger logger = LoggerFactory.getLogger(lookup().lookupClass());
private static final int RETRY_COUNT = 3;
private final RabbitTemplate rabbitTemplate;
private final Jaxb2Marshaller xmlMarshaller;
public DeathCheckHandler(RabbitTemplate rabbitTemplate,Jaxb2Marshaller xmlMarshaller) {
this.rabbitTemplate = rabbitTemplate;
this.xmlMarshaller = xmlMarshaller;
}
@ServiceActivator
public void check(Message<?> message) {
MessageHeaders headers = message.getHeaders();
Optional<XDeath> rejected = findAnyRejectedXDeathMessageHeader(headers);
if (rejected.isPresent()) {
int rejectedCount = rejected.get().getCount();
logger.debug("Rejected count is {}",rejectedCount);
if (rejectedCount > RETRY_COUNT) {
parkMessage(message);
}
}
}
private void parkMessage(Message<?> message) {
Object payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
String parkingExchange = (String) headers.get("amqp_receivedExchange");
String parkingRoutingKey = ((String) headers.get("amqp_consumerQueue")).replace("queue","plq");
rabbitTemplate.setMessageConverter(new MarshallingMessageConverter(xmlMarshaller));
logger.warn("Tried more than {} times. Parking rejected message: {} to exchange {} and routing key {}",RETRY_COUNT,payload,parkingExchange,parkingRoutingKey);
rabbitTemplate.convertAndSend(parkingExchange,parkingRoutingKey,payload);
// cause the message to be ackNowledged and not routed to DLQ
throw new ImmediateAckNowledgeAmqpException("Give up retrying message: " + payload);
}
}
DeathCheckHandler
处理在AMQP队列上设置的死字符。
如何以不正确的格式(即MarshallingMessageConverter
抛出UnmarshallingFailureException
时)存放XML消息。
我想以与DeathCheckHandler#parkMessage
ConditionalRejectingErrorHandler
应该有可能,但是我不知道如何。
解决方法
克隆ConditionalRejectingErrorHandler
。
将此方法用作模板...
@Override
public void handleError(Throwable t) {
log(t);
if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
if (this.discardFatalsWithXDeath && t instanceof ListenerExecutionFailedException) {
Message failed = ((ListenerExecutionFailedException) t).getFailedMessage();
if (failed != null) {
List<Map<String,?>> xDeath = failed.getMessageProperties().getXDeathHeader();
if (xDeath != null && xDeath.size() > 0) {
this.logger.error("x-death header detected on a message with a fatal exception; "
+ "perhaps requeued from a DLQ? - discarding: " + failed);
throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present");
}
}
}
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal",this.rejectManual,t);
}
}
默认情况下,带有x-death标头的致命异常通过ImmediateAcknowledgeAmqpException
丢弃。
子类和重写此方法并不容易,因为字段是私有的,因此仅复制此类(并在抛出IAAE之前发布到停车场)将是最容易的。
我将对该类进行一些改进,以使其更易于自定义/覆盖。