问题描述
我有一个用例,我想在应用程序收到消息后立即确认消息,而不是等待流完成。流程如下。目前简单的消息监听器容器配置为AUTO确认模式。
@Bean
public IntegrationFlow processAggregateEventFlow(SimpleMessageListenerContainer messageListenerContainer,@Qualifier("errorChannel") MessageChannel eventErrorChannel) {
return IntegrationFlows
// Create message listener container and queueEvent error channel
.from(Amqp.inboundAdapter(messageListenerContainer).errorChannel(eventErrorChannel))
.transform(new JsonToObjectTransformer(Request.class,jacksonConfiguration.jsonObjectMapper()))
.filter(Request.class,e -> true)
.handle(requestMessageHandler)
.get();
}
解决方法
请参阅 AcknowledgeMode.NONE
并阅读他们的 JavaDoc:
public enum AcknowledgeMode {
/**
* No acks - {@code autoAck=true} in {@code Channel.basicConsume()}.
*/
NONE,/**
* Manual acks - user must ack/nack via a channel aware listener.
*/
MANUAL,/**
* Auto - the container will issue the ack/nack based on whether
* the listener returns normally,or throws an exception.
* <p><em>Do not confuse with RabbitMQ {@code autoAck} which is
* represented by {@link #NONE} here</em>.
*/
AUTO;
使用 MANUAL
,有几个标头添加到来自该 AMQP 入站通道适配器的消息中:
headers.put(AmqpHeaders.DELIVERY_TAG,deliveryTag);
headers.put(AmqpHeaders.CHANNEL,channel);
因此您可以在 .handle()
之后使用 from(Amqp)
来调用 channel.basicAck(deliveryTag,false);
。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/amqp.html#amqp-inbound-ack