在处理失败的情况下,如何将STOMP消息重新传递给使用者?

问题描述

高级体系结构

JMS(生产者/消费者) Artemis(STOMP) Websocket经纪人中继服务 STOMP-over-Websocket-client(生产者/消费者)

一些观察

  1. 在STOMP使用者中,使用 client-individual ack订阅,无论我是 NACK 还是 ACK ,消息都将被丢弃阿耳emi弥斯。我希望将邮件重新发送给相同或任何其他消费者。有办法实现吗?

  2. 在JMS使用者中,如果在Artemis上接收到消息的使用者掉线了,则不会传递持久消息。我的期望是,一旦消费者服务再次恢复正常运行,便会传递持久的消息。

class StompSessionHandlerImpl implements StompSessionHandler {
    @Override
    public void afterConnected(StompSession session,StompHeaders connectedHeaders) {
        session.setAutoReceipt(Boolean.FALSE);
        StompHeaders headers1 = new StompHeaders();
        headers1.setDestination("/queue/msg");
        headers1.add("durable-subscription-name",messagingUtil.getServiceSubscriptionChannel());
        headers1.add("Authorization","Bearer ".concat(token));
        headers1.setAck("client-individual");
        session.subscribe(headers1,this);

    }

    @Override
    public void handleException(StompSession session,StompCommand command,StompHeaders headers,byte[] payload,Throwable exception) {
        session.acknowledge(Objects.requireNonNull(headers.getMessageId()),false);
    }

    @Override
    public void handleTransportError(StompSession session,Throwable exception) {
        synchronized (StompSessionHandlerImpl.msgSenderLock) {
            if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
                initStompSession();
            }
        }
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return COMessage.class;
    }

    @Override
    public void handleFrame(StompHeaders headers,Object payload) {
        if (payload == null) return;
        COMessage msg = (COMessage) payload;
     try {
        stompMessagingService.handleReceivedMessages(msg);
        self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(),true);
       } catch (Exception e) {
           self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(),false);
       }

    }


    @PreDestroy
    public void cleanUp() {
        self.stompMessagingService.getStompSession().disconnect();
    }

}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
    @Bean
    public WebSocketStompClient stompClient() {
        WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
        List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
        SockJsClient sockJsClient = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,false);
        converter.setObjectMapper(objectMapper);
        stompClient.setMessageConverter(converter);
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10000);
        scheduler.initialize();
        stompClient.setTaskScheduler(scheduler);
        stompClient.setDefaultHeartbeat(new long[]{20000,20000});
        stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
        ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
        return stompClient;
    }
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private String host;

    private String password;

    private String user;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/queue","/topic","/exchange")
                .setRelayHost(host)
                .setClientLogin(user)
                .setClientPasscode(password)
                .setSystemHeartbeatSendInterval(20000)
                .setSystemLogin(user)
                .setSystemPasscode(password)
                .setUserDestinationBroadcast("/topic/unresolved-user")
                .setUserRegistryBroadcast("/topic/log-user-registry");
        config.setApplicationDestinationPrefixes("/device");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
        registry.setErrorHandler(new StompSubProtocolErrorHandler());
    }

    @Bean
    public DefaultSimpUserRegistry getDefaultSimpRegistry() {
        return new DefaultSimpUserRegistry();
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(Integer.MAX_VALUE);
        registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
        registry.setTimeToFirstMessage(300000);
        registry.setSendTimeLimit(300000);
        registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
                return new EmaWebSocketHandlerDecorator(webSocketHandler);
            }
        });

    }

}
class ArtemisConfig extends ArtemisAutoConfiguration {

    @Bean("mqConnectionFactory")
    public ConnectionFactory senderActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory =
               new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
        connectionFactory.setUser(user);
        connectionFactory.setPassword(password);
        connectionFactory.setConnectionTTL(-1L);
        connectionFactory.setClientID(clientID);
        connectionFactory.setEnableSharedClientID(true);
        connectionFactory.setPreAcknowledge(Boolean.FALSE);
        return connectionFactory;
    }

    @Bean("mqCachingConnectionFactory")
    @Primary
    public ConnectionFactory cachingConnectionFactory() {
        return new CachingConnectionFactory(senderActiveMQConnectionFactory());
    }

    @Bean("jmsTemplate")
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setMessageConverter(jsonMessageConverter);
        jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
        jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
        jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
        return jmsTemplate;
    }

    @PreDestroy
    public void cleanUp() {
        if (connection.isStarted()) {
            try {
                connection.close();
            } catch (JMSException e) {
                log.error("Failed to close the JMS connection {0}",e);
            }
        }
    }

}

解决方法

使用ActiveMQ Artemis时,STOMP ACK框架会告诉代理该消息已被成功使用,因此应将其从队列中删除。一个STOMP NACK帧告诉代理该消息已成功使用 ,因此代理将丢弃该消息。 STOMP规范未指定此处的确切行为。它只说:

NACKACK相反。它用于告诉服务器客户端没有使用该消息。然后,服务器可以将消息发送到其他客户端,将其丢弃,或将其放入死信队列。确切的行为是特定于服务器的。

NACK具有与ACK相同的标头:id(必填)和transaction(可选)。

NACK适用于一条消息(如果订阅的确认模式为client-individual)或之前发送且尚未确认或未确认的所有消息(如果订阅的确认模式为{{1} }。

如果您希望重新发送该消息,则既不确认也不拒绝该消息,并且在关闭消费者的连接时,该消息将重新放置在队列中,以传递给另一个(或相同)客户端。

将来,我希望这种行为是可配置的。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...