网络断开后ActiveMQ 5重新交付不起作用

问题描述

框架

  • Java 1.7.0_191和1.8.0_181
  • 春季4.3.18。发布
  • ActiveMQ 5.14.5

场景

两个客户端和一个服务器。客户端1失去连接(由于网络断开连接)。客户端2发送并使用一条消息。客户端1重新获得网络连接。现在,我希望将消息重新传递给客户端1。尽管客户端1现在可以正常工作(获取所有新消息),但是客户端1仍未收到更新消息,因此仍有一定的差距,因此客户端不再受信任。 >

这是设计使然还是我配置有误?

Serverbroker

final String brokerURI = String.format("broker://(tcp://%s:%s)?brokerName=clientbroker",host,port);
final brokerService brokerService = brokerFactory.createbroker(brokerURI);
brokerService.setUseJmx(true);
brokerService.setDataDirectory(dataDirectory);
final LoggingbrokerPlugin loggingbrokerPlugin = new LoggingbrokerPlugin();
loggingbrokerPlugin.setLogConsumerEvents(true);
loggingbrokerPlugin.setLogProducerEvents(true);
brokerService.setPlugins(new brokerPlugin[] { loggingbrokerPlugin });
brokerService.start();

ServerProducer

final jmstemplate jmstemplate = new jmstemplate();
jmstemplate.setPubSubDomain(true);
jmstemplate.setExplicitQosEnabled(true);
jmstemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmstemplate.setTimetoLive(600_000L);
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package","java"));
activeMQConnectionFactory.setbrokerURL(String.format("tcp://%s:%s",port));
jmstemplate.setConnectionFactory(new PooledConnectionFactory(activeMQConnectionFactory));
jmstemplate.convertAndSend(JmsQueueConstants.SERVER_UPDATE,new NotificationQueueEntry());

ClientConsumer

@JmsListener(destination = JmsQueueConstants.SERVER_UPDATE)
public void receive(final NotificationQueueEntry msg) {
    process(msg);
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(jmsConnectionFactory());
    factory.setPubSubDomain(true);
    factory.setSessionTransacted(true);
    factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
    factory.setConcurrency("2");
    return factory;
}

private ConnectionFactory jmsConnectionFactory() {
    final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package","java"));
    activeMQConnectionFactory.setbrokerURL(String.format("failover:(tcp://%s:%s)?jms.closeTimeout=%d",port,600_000));
    final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setinitialRedeliveryDelay(10_000L);
    redeliveryPolicy.setRedeliveryDelay(1_000L);
    redeliveryPolicy.setMaximumRedeliveries(600);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setTransportListener(new TransportListener() {
        
        @Override
        public void onCommand(final Object command) {
        }
        
        @Override
        public void onException(final IOException error) {
            connected = false;
        }
        
        @Override
        public void transportInterupted() {
            connected = false;
        }
        
        @Override
        public void transportResumed() {
            if (!connected) {
                reconnect();
            }
        }
        
    });
    return new PooledConnectionFactory(activeMQConnectionFactory);
}

connected reconnect()只是为了显示客户端中的断开状态,而不是主动重新连接ActiveMQ连接。

日志

2020-09-09 11:39:52,415 [ActiveMQ Transport: tcp:///<client-1-ip>:54049@1079] DEBUG o.a.a.b.T.Transport:241 Transport Connection to: tcp://<client-1-ip>:54049 Failed: java.net.socketException: Connection reset
2020-09-09 11:39:52,416 [ActiveMQ brokerService[clientbroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-1-ip>:54049@1079
2020-09-09 11:39:52,417 [ActiveMQ brokerService[clientbroker] Task-17] DEBUG o.a.a.b.TransportConnection:1233 Cleaning up connection resources: tcp://<client-1-ip>:54049
2020-09-09 11:39:56,339 [RMI TCP Connection(310)-<client-2-ip>] INFO m.p.s.DatabaseServiceImplementation:98 saving entity to database
2020-09-09 11:39:56,342 [server-pool-1-thread-16] INFO m.p.c.JmsConnectionFactoryCache:73 Create connection for address: client-2:1079
2020-09-09 11:39:56,343 [server-pool-1-thread-16] INFO m.p.s.ServerProducerImplementation:268 Send message to client-2:1079. Data: NotificationQueueEntry
2020-09-09 11:39:56,343 [ActiveMQ Transport: tcp:///<client-2-ip>:63125@1079] INFO o.a.a.b.u.LoggingbrokerPlugin:257 Removing Producer: ProducerInfo {commandId = 4,responserequired = true,producerId = ID:client-2-61550-1599640739463-4:5:1:1,destination = null,brokerPath = null,dispatchAsync = false,windowSize = 0,sentCount = 1}
2020-09-09 11:39:56,345 [ActiveMQ brokerService[clientbroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-2-ip>:63125@1079
2020-09-09 11:39:56,345 [server-pool-1-thread-16] DEBUG o.a.a.util.ThreadPoolUtils:155 Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@f78a38[Terminated,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
2020-09-09 11:39:56,346 [server-pool-1-thread-16] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp://client-2/<client-2-ip>:1079@63125
2020-09-09 11:39:56,348 [server-pool-1-thread-16] DEBUG o.a.a.t.WireFormatNegotiator:82 Sending: WireFormatInfo { version=12,properties={StackTraceEnabled=true,PlatformDetails=JVM: 1.8.0_181,25.181-b13,Oracle Corporation,OS: Windows 10,10.0,x86,CacheEnabled=true,Host=client-2,TcpNoDelayEnabled=true,SizePrefixdisabled=false,CacheSize=1024,ProviderName=ActiveMQ,TightEncodingEnabled=true,MaxFrameSize=9223372036854775807,MaxInactivityDuration=30000,MaxInactivityDurationInitalDelay=10000,ProviderVersion=5.14.5},magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12,properties={TcpNoDelayEnabled=true,StackTraceEnabled=true,Q]} and remote: WireFormatInfo { version=12,350 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp://client-2/<client-2-ip>:1079@52652 before negotiation: OpenWireFormat{version=12,cacheEnabled=false,stackTraceEnabled=false,tightEncodingEnabled=false,sizePrefixdisabled=false,maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-2-ip>:52652@1079 before negotiation: OpenWireFormat{version=12,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp://client-2/<client-2-ip>:1079@52652 after negotiation: OpenWireFormat{version=12,cacheEnabled=true,stackTraceEnabled=true,tightEncodingEnabled=true,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-2-ip>:52652@1079 after negotiation: OpenWireFormat{version=12,354 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingbrokerPlugin:192 Adding Producer: ProducerInfo {commandId = 4,producerId = ID:client-2-61550-1599640739463-4:6:1:1,sentCount = 0}
2020-09-09 11:39:56,357 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingbrokerPlugin:285 Sending message: ActiveMQObjectMessage {commandId = 5,messageId = ID:client-2-61550-1599640739463-4:6:1:1:1,originalDestination = null,originalTransactionId = null,destination = topic://ServerUpdate,transactionId = null,expiration = 1599644996357,timestamp = 1599644396357,arrival = 0,brokerInTime = 0,brokerOutTime = 0,correlationId = null,replyTo = null,persistent = true,type = null,priority = 4,groupID = null,groupSequence = 0,targetConsumerId = null,compressed = false,userID = null,content = org.apache.activemq.util.ByteSequence@11eb3c5,marshalledProperties = null,dataStructure = null,redeliveryCounter = 0,size = 0,properties = null,readOnlyProperties = false,readOnlyBody = false,droppable = false,jmsXGroupFirstForConsumer = false}
2020-09-09 11:39:56,358 [ActiveMQ brokerService[clientbroker] Task-18] INFO o.a.a.b.u.LoggingbrokerPlugin:428 preProcessdispatch: Messagedispatch {commandId = 0,responserequired = false,consumerId = ID:client-2-52500-1599644321472-1:1:1:1,message = ActiveMQObjectMessage {commandId = 5,brokerInTime = 1599644396358,content = org.apache.activemq.util.ByteSequence@162a6d1,size = 1936,jmsXGroupFirstForConsumer = false},redeliveryCounter = 0}
2020-09-09 11:39:56,359 [ActiveMQ brokerService[clientbroker] Task-18] INFO o.a.a.b.u.LoggingbrokerPlugin:436 postProcessdispatch: Messagedispatch {commandId = 0,brokerOutTime = 1599644396359,419 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] INFO o.a.a.b.u.LoggingbrokerPlugin:157 AckNowledging message for client ID: ID:client-2-52500-1599644321472-0:1,ID:client-2-61550-1599640739463-4:6:1:1:1
2020-09-09 11:39:56,421 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] DEBUG o.a.a.t.LocalTransaction:48 commit: TX:ID:client-2-52500-1599644321472-1:1:1 syncCount: 1
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12,properties={CacheSize=1024,PlatformDetails=JVM: 1.7.0_191,24.191-b08,ProviderVersion=3.2.0.0-SNAPSHOT,MaxInactivityDurationInitalDelay=10000},Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-1-ip>:50446@1079 before negotiation: OpenWireFormat{version=12,maxFrameSize=9223372036854775807}
2020-09-09 11:41:04,216 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-1-ip>:50446@1079 after negotiation: OpenWireFormat{version=12,maxFrameSize=9223372036854775807}

解决方法

这是非持久主题订户的预期行为。如果没有连接这样的订户,它将不会收到发送给代理的任何消息。

还值得注意的是,这通常不是所谓的“交付”。消息重新传递就是发生的情况,例如,当某条消息在事务中被使用,并且该事务被回滚,然后将该消息重新传递给到客户端以再次尝试。