问题描述
我有一个 Spring Cloud Stream 应用程序,它使用 RabbitMQ 接收来自 Rabbit Binder 的消息、更新我的数据库并发送一条或多条消息。我的申请可以概括为demo app:
问题是 @Transactional
似乎不起作用(或者至少这是我的印象),因为如果出现异常,数据库会回滚,但即使消费者/生产者默认配置为发送消息交易。
鉴于我想要实现的是发生异常时我希望消费的消息在重试后进入 DLQ 数据库回滚并且不发送消息。
我怎样才能做到这一点?
这是我发送消息 my-input
exchange 时演示应用程序的输出
2021-01-19 14:31:20.804 ERROR 59593 --- [nput.my-group-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking MyListener#process[1 args]; nested exception is java.lang.RuntimeException: MyError,FailedMessage=Genericmessage [payload=byte[4],headers={amqp_receivedDeliveryMode=NON_PERSISTENT,amqp_receivedRoutingKey=#,amqp_receivedExchange=my-input,amqp_deliveryTag=2,deliveryAttempt=3,amqp_consumerQueue=my-input.my-group,amqp_redelivered=false,id=006f733f-5eab-9119-347a-625570383c47,amqp_consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ,sourceData=(Body:'[B@177259f3(byte[4])' MessageProperties [headers={},contentLength=0,receivedDeliveryMode=NON_PERSISTENT,redelivered=false,receivedExchange=my-input,receivedRoutingKey=#,deliveryTag=2,consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ,consumerQueue=my-input.my-group]),contentType=application/json,timestamp=1611063077789}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: MyError
at com.example.demo.MyListener.process(DemoApplication.kt:46)
at com.example.demo.MyListener$$FastClassBySpringcglib$$4381219a.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.invokeJoinpoint(cglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.proceed(cglibAopProxy.java:750)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.proceed(cglibAopProxy.java:750)
at org.springframework.aop.framework.cglibAopProxy$DynamicAdvisedInterceptor.intercept(cglibAopProxy.java:692)
at com.example.demo.MyListener$$EnhancerBySpringcglib$$f4ed3689.process(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.messaging.handler.invocation.invocableHandlerMethod.doInvoke(invocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.invocableHandlerMethod.invoke(invocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 29 more
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
解决方法
由于您将失败的消息发布到 DLQ,因此从 Rabbit 的角度来看,事务成功并且原始消息得到确认并从队列中删除,并且提交了 Rabbit 事务。
你不能用 republishToDlq
做你想做的事。
如果您使用普通的 DLQ 机制(republishToDlq=false
,代理将原始消息发送到 DLQ)而不是使用额外的元数据重新发布,它将起作用。
如果您想使用元数据重新发布,您可以使用非事务性 RabbitTemplate
手动发布到 DLQ(因此 DLQ 发布不会随着其他发布而回滚)。
编辑
这是一个如何做你需要做的事情的例子。
注意事项:
- 我们必须添加一个错误处理程序来重新抛出异常。
- 我们必须将重试移至侦听器容器而不是绑定器;否则,重试将在事务内发生,如果重试成功,多条消息将被存放在输出队列中。
- 要使有状态重试工作,我们必须能够唯一标识每条消息;最简单的解决方案是让发件人设置唯一的
message_id
属性(例如 UUID)。
@SpringBootApplication
@EnableBinding(Processor.class)
public class So65792643Application {
public static void main(String[] args) {
SpringApplication.run(So65792643Application.class,args);
}
@Autowired
Processor processor;
@StreamListener(Processor.INPUT)
public void in(Message<String> in) {
System.out.println(in.getPayload());
processor.output().send(new GenericMessage<>(in.getPayload().toUpperCase()));
int attempt = RetrySynchronizationManager.getContext().getRetryCount();
if (in.getPayload().equals("okAfterRetry") && attempt == 1) {
System.out.println("success");
}
else {
throw new RuntimeException();
}
}
@Bean
RepublishMessageRecoverer repub(RabbitTemplate template) {
RepublishMessageRecoverer repub =
new RepublishMessageRecoverer(template,"DLX","rk");
return repub;
}
@Bean
Queue dlq() {
return new Queue("my-output.dlq");
}
@Bean
DirectExchange dlx() {
return new DirectExchange("DLX");
}
@Bean
Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(dlx()).with("rk");
}
@ServiceActivator(inputChannel = "my-input.group1.errors")
void errorHandler(ErrorMessage message) {
MessagingException mex = (MessagingException) message.getPayload();
throw mex;
}
@RabbitListener(queues = "my-output.dlq")
void dlqListen(Message<String> in) {
System.out.println("DLQ:" + in);
}
@RabbitListener(queues = "my-output.group2")
void outListen(String in) {
if (in.equals("OKAFTERRETRY")) {
System.out.println(in);
}
else {
System.out.println("Should not see this:" + in);
}
}
/*
* We must move retries from the binder to stateful retries in the container so that
* each retry is rolled back,to avoid multiple publishes to output.
* See max-attempts: 1 in the yaml.
* In order for stateful retry to work,inbound messages must have a unique message_id
* property.
*/
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer> customizer(RepublishMessageRecoverer repub) {
return (container,destinationName,group) -> {
if ("group1".equals(group)) {
container.setAdviceChain(RetryInterceptorBuilder.stateful()
.backOffOptions(1000,2.0,10000)
.maxAttempts(2)
.recoverer(recoverer(repub))
.keyGenerator(args -> {
// or generate a unique key some other way
return ((org.springframework.amqp.core.Message) args[1]).getMessageProperties()
.getMessageId();
})
.build());
}
};
}
private MethodInvocationRecoverer<?> recoverer(RepublishMessageRecoverer repub) {
return (args,cause) -> {
repub.recover(((ListenerExecutionFailedException) cause).getFailedMessage(),cause);
throw new AmqpRejectAndDontRequeueException(cause);
};
}
}
spring:
cloud:
stream:
rabbit:
default:
producer:
transacted: true
consumer:
transacted: true
requeue-rejected: true
bindings:
input:
destination: my-input
group: group1
consumer:
max-attempts: 1
output:
destination: my-output
producer:
required-groups: group2
okAfterRetry
2021-01-20 12:45:24.385 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
okAfterRetry
success
OKAFTERRETRY
notOkAfterRetry
2021-01-20 12:45:39.336 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
notOkAfterRetry
2021-01-20 12:45:39.339 WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
DLQ:GenericMessage [payload=notOkAfterRetry,...,x-exception-message...