问题描述
我正在处理消息排序问题,不久前修复了它,现在修复不再起作用。
只是为了概述,我有以下环境:
订单在 tcpAdapter 和消息接收者之间的某个地方丢失了。
我使用以下方法修复了这个问题:
- 在生产者方面 - 使用发布者确认和返回
rabbitmq:
publisher-confirms: true
publisher-returns: true
- 在消费者方面 - 强制执行单线程执行器: 我在这里找到的想法:RabbitMQ - Message order of delivery,为此我使用了后处理器。
@Component
public class RabbitConnectionFactoryPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean,String beanName) throws BeansException {
if (bean instanceof CachingConnectionFactory) {
((CachingConnectionFactory) bean).setExecutor(Executors.newSingleThreadExecutor());
}
return bean;
}
}
现在,在一些 master-pom 更新之后(我们不控制 master pom,它在项目级别),修复突然不再起作用了。检查差异后,我没有看到spring-rabbit或spring-amqp有任何变化,我不明白为什么会有影响。
如果您需要具体示例,这里有更多详细信息:
- 制作人。
TCP Server 向 tcpAdapter 应用程序发送消息,该应用程序使用 spring-integration 流从 TCP 获取消息并将其发送到 rabbitmq。
这是执行此操作的代码(inboundAdapterClient 我没有在此处发布,因为我认为这不重要):
@Bean
public IntegrationFlow tcpToRabbitFlowClient() {
return IntegrationFlows.from(inboundAdapterClient())
.transform(tcpToRabbitTransformer)
.channel(TCP_ADAPTER_SOURCE);
.get();
}
tcpAdapter 应用程序以正确的顺序从 TCP 接收消息,但是 tcpAdapter rabbitmq 堆栈不会每次都以正确的顺序发送它们(80% 的时间正常,20% 的顺序错误)
这是 spring boot yml 配置(仅相关信息):
spring:
rabbitmq:
publisher-confirms: true
publisher-returns: true
cloud:
stream:
bindings:
tcpAdapterSource:
binder: rabbit
content-type: application/json
destination: tcpadapter.messagereceiver
- 消费者。
消息接收者强制执行单线程执行器,并配置如下。
这里是spring boot yml配置(仅相关信息)
spring:
cloud:
stream:
bindings:
fromTcpAdapter:
binder: rabbit
content-type: application/json
destination: tcpadapter.messagereceiver
rabbit:
default:
producer:
exchangeDurable: false
exchangeAutoDelete: true
consumer:
exchangeDurable: false
exchangeAutoDelete: true
来自 pom 的一些版本,也许有帮助:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>3.0.1.RELEASE</version>
</dependency>
解决方法
通过删除 yml 配置并使用显式 bean 声明和工厂配置解决,如下所述。唯一的问题是性能缓慢,但发布商确认这是预期的。
所以这确实是生产者的问题。
@Bean
public CachingConnectionFactory connectionFactory() {
com.rabbitmq.client.ConnectionFactory connectionFactoryClient = new com.rabbitmq.client.ConnectionFactory();
connectionFactoryClient.setUsername(username);
connectionFactoryClient.setPassword(password);
connectionFactoryClient.setHost(hostname);
connectionFactoryClient.setVirtualHost(vhost);
return new CachingConnectionFactory(connectionFactoryClient);
}
@Bean("rabbitTemplateAdapter")
@Primary
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirmType(CORRELATED);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)
-> log.debug("correlationData({}),ack({}),cause ({})",correlationData,cause));
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey)
-> log.debug("exchange({}),route({}),replyCode({}),replyText({}),message:{}",routingKey,message));
return rabbitTemplate;
}
用于发送消息:
rabbitTemplateAdapter.invoke(t -> {
t.convertAndSend(
exchange,DESTINATION,jsonMessage.getPayload(),m -> {outboundMapper().fromHeadersToRequest(jsonMessage.getHeaders(),m.getMessageProperties());
return m;
});
t.waitForConfirmsOrDie(10_000);
return true;
});
我使用 spring rabbit 和 amqp 版本做到了这一点:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
Spring amqp 文档帮助很大,使用的技术称为“范围操作”: https://docs.spring.io/spring-amqp/docs/2.2.7.RELEASE/reference/html/#scoped-operations