问题描述
我们的应用程序具有MessageHandler
的实现,有望实现处理来自队列的消息。在不同的环境中,我们连接到不同的队列(GCP PubSub / AMQ)。
对于GCP PubSub,当从MessageHandler#handleMessage
(org.springfranework.messaging
)引发异常时,该消息将返回给PubSub。
而AMQ不会发生相同的事情。
MessageHandler
实例只是在用ServiceActivator
注释的方法上返回。即未明确映射到其他任何地方。
我们为AMQ提供的简化配置如下
配置创建JMSEndpoint:
private JmsMessageDrivenEndpoint jmsMessageDrivenEndpoint(@Autowired MessageChannel inputChannel) {
ConnectionFactory connectionFactory= new JmsConnectionFactory(username,password,url);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName("my-subscription");
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container,listener);
endpoint.setoutputChannel(inputChannel);
return endpoint;
}
配置频道:
@Bean
public MessageChannel myChannel() {
return new DirectChannel();
}
MessageHandler的ServiceActivator:
@Bean
@ServiceActivator(inputChannel = "myChannel")
public MessageHandler engagementRegistration() {
return new CustomHandler();
}
简化的自定义处理程序:
class CustomHandler implements MessageHandler<?>
{
public void handleMessage(Message<?> message) {
// logic to handle the message and throw MessageException(message)
// When I have to send back the message
}
}
解决方法
很高兴看到此事的一些配置...
您的GCP PubSub可能未配置为自动确认(或类似名称)。还是自动确认,但是在整个过程将控制权交还给GCP使用者之后。
不清楚什么是AMQ,但可能会在到达消费者时自动确认消息,而不会等待过程完成。
更新
因此,您的AMQ与JMS有关。
正如我所说,您在发生异常之前会先进行自动确认。
参见JmsAccessor.sessionAcknowledgeMode
。它是Session.AUTO_ACKNOWLEDGE
。现在查看其JavaDocs:
/** With this acknowledgment mode,the session automatically acknowledges
* a client's receipt of a message either when the session has successfully
* returned from a call to {@code receive} or when the message
* listener the session has called to process the message successfully
* returns.
*/
static final int AUTO_ACKNOWLEDGE = 1;
考虑在该SimpleMessageListenerContainer
上将其配置为true
:
/**
* Set the transaction mode that is used when creating a JMS {@link Session}.
* Default is "false".
* <p>Note that within a JTA transaction,the parameters passed to
* {@code create(Queue/Topic)Session(boolean transacted,int acknowledgeMode)}
* method are not taken into account. Depending on the Java EE transaction context,* the container makes its own decisions on these values. Analogously,these
* parameters are not taken into account within a locally managed transaction
* either,since the accessor operates on an existing JMS Session in this case.
* <p>Setting this flag to "true" will use a short local JMS transaction
* when running outside of a managed transaction,and a synchronized local
* JMS transaction in case of a managed transaction (other than an XA
* transaction) being present. This has the effect of a local JMS
* transaction being managed alongside the main transaction (which might
* be a native JDBC transaction),with the JMS transaction committing
* right after the main transaction.
* @see javax.jms.Connection#createSession(boolean,int)
*/
public void setSessionTransacted(boolean sessionTransacted) {