将无效格式的XML消息停放在AMQP停车场队列中

问题描述

鉴于我有IntegrationFlow

IntegrationFlows.from(
        Amqp.inboundAdapter(rabbitConnectionFactory,QUEUE)
                .messageConverter(new MarshallingMessageConverter(xmlMarshaller))
                .defaultRequeueRejected(false)
                .concurrentConsumers(2)
                .maxConcurrentConsumers(4)
                .channelTransacted(true)
                .errorHandler(new ConditionalRejectingErrorHandler())
)
        .log(INFO,AMQP_LOGGER_CATEGORY)
        .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(deathCheckHandler))
                .subscribe(f -> f.handle(service))
        )
        .get();

deathCheckHandler在哪里

@Component
public class DeathCheckHandler {

    private static final Logger logger = LoggerFactory.getLogger(lookup().lookupClass());

    private static final int RETRY_COUNT = 3;
    private final RabbitTemplate rabbitTemplate;
    private final Jaxb2Marshaller xmlMarshaller;

    public DeathCheckHandler(RabbitTemplate rabbitTemplate,Jaxb2Marshaller xmlMarshaller) {
        this.rabbitTemplate = rabbitTemplate;
        this.xmlMarshaller = xmlMarshaller;
    }

    @ServiceActivator
    public void check(Message<?> message) {
        MessageHeaders headers = message.getHeaders();

        Optional<XDeath> rejected = findAnyRejectedXDeathMessageHeader(headers);
        if (rejected.isPresent()) {
            int rejectedCount = rejected.get().getCount();
            logger.debug("Rejected count is {}",rejectedCount);
            if (rejectedCount > RETRY_COUNT) {
                parkMessage(message);
            }
        }
    }

    private void parkMessage(Message<?> message) {
        Object payload = message.getPayload();
        MessageHeaders headers = message.getHeaders();
        String parkingExchange = (String) headers.get("amqp_receivedExchange");
        String parkingRoutingKey = ((String) headers.get("amqp_consumerQueue")).replace("queue","plq");
        rabbitTemplate.setMessageConverter(new MarshallingMessageConverter(xmlMarshaller));
        logger.warn("Tried more than {} times. Parking rejected message: {} to exchange {} and routing key {}",RETRY_COUNT,payload,parkingExchange,parkingRoutingKey);
        rabbitTemplate.convertAndSend(parkingExchange,parkingRoutingKey,payload);
        // cause the message to be ackNowledged and not routed to DLQ
        throw new ImmediateAckNowledgeAmqpException("Give up retrying message: " + payload);
    }
}

DeathCheckHandler处理在AMQP队列上设置的死字符。

如何以不正确的格式(即MarshallingMessageConverter抛出UnmarshallingFailureException时)存放XML消息。

我想以与DeathCheckHandler#parkMessage

相似的方式停放它

ConditionalRejectingErrorHandler应该有可能,但是我不知道如何。

解决方法

克隆ConditionalRejectingErrorHandler

将此方法用作模板...

@Override
public void handleError(Throwable t) {
    log(t);
    if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
        if (this.discardFatalsWithXDeath && t instanceof ListenerExecutionFailedException) {
            Message failed = ((ListenerExecutionFailedException) t).getFailedMessage();
            if (failed != null) {
                List<Map<String,?>> xDeath = failed.getMessageProperties().getXDeathHeader();
                if (xDeath != null && xDeath.size() > 0) {
                    this.logger.error("x-death header detected on a message with a fatal exception; "
                            + "perhaps requeued from a DLQ? - discarding: " + failed);
                    throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present");
                }
            }
        }
        throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal",this.rejectManual,t);
    }
}

默认情况下,带有x-death标头的致命异常通过ImmediateAcknowledgeAmqpException丢弃。

子类和重写此方法并不容易,因为字段是私有的,因此仅复制此类(并在抛出IAAE之前发布到停车场)将是最容易的。

我将对该类进行一些改进,以使其更易于自定义/覆盖。

Pull Request

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...