Qpid客户端Apache Artemis 2.14.0高可用性不起作用

问题描述

使用Qpid客户端连接Apache Artemis代理以实现高可用性。

代理实例在两个节点中运行,并且配置在broker.xml中列出了复制内容

brokers实例在node1(主节点)和node2(从属节点)上启动,并且正在运行,没有任何问题。

骆驼qpid jms客户端在执行骆驼客户端时使用URL配置为failover:(amqp://localhost:5672,amqp://localhost:5673),并且上下文无任何问题地启动,并且还注意到在broker UI Console中列为AMQP协议的连接。 [下面提供了配置详细信息]

通过以下配置,一切正常。

为了测试高可用性,我在node1上停止了代理实例,并期望Qpid Camel客户端自动发现node2代理并处理消息。但是它没有按预期连接。

但是,当我使用带有包括tcp方案连接的URL的aretmis-jms骆驼客户端时,我能够成功验证高可用性,其中由于某种原因,由于某些原因,客户端在节点2上自动发现代理时,停止了在node1中运行的代理被停止。同样,当node1启动时,备份客户端会自动连接到node1。

Qpid客户端无法发现备份代理。以下配置中的任何问题

master:有关详细配置,请参见link

<ha-policy>
   <replication>
      <master>
         <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

从属:

<ha-policy>
   <replication>
      <slave>
         <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>

客户使用骆驼

<bean id="jmsampqConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
       <property name="remoteURI" value="failover:(ampq://localhost:5672,ampq://localhost:5673)" />
       <property name="username" value="user"/>
       <property name="password" value="pass"/>
   </bean>

  <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
    <property name="maxConnections" value="5" />
    <property name="connectionFactory" ref="jmsamqpConnectionFactory" />
  </bean>

  <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
    <property name="concurrentConsumers" value="5" />
  </bean>

   <bean id="jms" class="org.apache.camel.component.amqp.AMQPComponent">
    <property name="configuration" ref="jmsConfig" />
  </bean>
  
   <bean id="CustomBean1" class="org.specific.process.class" /> 
   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="jms:queue:enterprise1.queue" />
      <convertBodyTo type="java.lang.String" />
      <bean ref="CustomBean1" method="processCamelExchangeData" />
    </route>
    </camelContext>

maven依赖

   <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp-starter</artifactId>
      <version>2.23.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>0.54.0</version>
     </dependency> 
     <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>artemis-jms-client</artifactId>
      <version>2.14.0</version>
    </dependency>
    <dependency>
      <groupId>org.messaginghub</groupId>
      <artifactId>pooled-jms</artifactId>
      <version>1.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp</artifactId>
      <version>2.23.0</version> 
    </dependency>

日志:

2020-09-17 20:15:18,684: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:18,690: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@3962ec84
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@147e0734
2020-09-17 20:15:18,693: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@2bdab835
2020-09-17 20:15:18,694: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@7b8aebd0
2020-09-17 20:15:18,695: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@55222ee9
2020-09-17 20:15:18,743: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://localhost:5672 in-progress
2020-09-17 20:15:19,102: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (SaslMechanismFinder.java:106) - Best match for SASL auth was: SASL-PLAIN
2020-09-17 20:15:19,119: FailoverProvider: async work thread DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConnectionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1,configuredURI = failover:(amqp://localhost:5672,amqp://localhost:5673),connectedURI = null } (1)
2020-09-17 20:15:19,162: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 } is Now open:
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:884) - Processing alternates uris:URI Pool { [amqp://localhost:5673,amqp://localhost:5672] } with new set: [amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:899) - Replacing uris:URI Pool { [amqp://localhost:5673,amqp://localhost:5672] } with new set: [amqp://localhost:5672,amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:913) - Processing alternates done new uris:URI Pool { [amqp://localhost:5672,amqp://localhost:61617?amqp.vhost=localhost] }
2020-09-17 20:15:19,165: AmqpProvider :(1):[amqp://localhost:5672] INFO (JmsConnection.java:1339) - Connection ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 connected to server: amqp://localhost:5672
2020-09-17 20:15:19,168: main DEBUG (JmsConsumer.java:106) - Started listener container org.apache.camel.component.jms.DefaultJmsMessageListenerContainer@5e9bf744 on destination enterprise1.queue
2020-09-17 20:15:19,168: main INFO (DefaultCamelContext.java:4013) - Route: route1 started and consuming from: jms://queue:enterprise1.queue
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3989) - Route: route2 >>> EventDrivenConsumerRoute[jmsTopic://topic:enterprise2.topic -> Pipeline[[Channel[convertBodyTo[java.lang.String]],Channel[org.apache.camel.component.bean.BeanProcessor@69d103f0]]]]
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1001) on route: route2
2020-09-17 20:15:19,216: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1 } (2)
2020-09-17 20:15:19,216: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2 } (3)
2020-09-17 20:15:19,216: Camel (camel) thread #6 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:3 } (4)
2020-09-17 20:15:19,217: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4 } (5)
2020-09-17 20:15:19,238: Camel (camel) thread #8 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:5 } (6)
2020-09-17 20:15:19,361: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1,destination = enterprise1.queue } (7)
2020-09-17 20:15:19,362: main DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=consumers,name=JmsConsumer(0x1132baa3)
2020-09-17 20:15:19,362: main DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[jmsTopic://topic:enterprise2.topic]
2020-09-17 20:15:19,364: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:19,365: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
...
2020-09-17 20:15:22,486: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: start -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4:1,destination = enterprise1.queue } (40)
2020-09-17 20:15:22,490: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2:1 (41)
2020-09-17 20:15:22,491: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1 (42)
...

解决方法

客户端正在从与其连接的Artemis服务器接收到已知代理URI的更新,该服务器似乎仅知道运行的服务器具有公布的端口61617。这导致客户端用代理返回的新集替换URI,如日志中所示:

Replacing uris:URI Pool { [amqp://localhost:5673,amqp://localhost:5672] } with new set: [amqp://localhost:5672,amqp://localhost:61617?amqp.vhost=localhost]

因此,如果重新连接到的适当服务器是端口5673上的原始备用服务器,则它将永远不会尝试该服务器,因为第一个代理告诉它唯一已知的代理将其发送到该备用服务器。通过将下面的故障转移URI选项设置为 ADD IGNORE ,可以更改客户端行为以不替换其已知主机的原始配置。

failover.amqpOpenServerListAction 控制当来自远程对等方的连接Open框架向客户端提供故障转移主机列表时,故障转移传输的行为。该选项接受以下三个值之一:替换,添加或忽略(默认为替换)。如果配置了REPLACE,则将当前服务器以外的所有故障转移URI替换为远程对等方提供的URI。如果配置了ADD,则将远程服务器提供的URI通过重复数据删除添加到现有的故障转移URI集。如果配置了IGNORE,则将删除来自远程的任何更新,并且不会更改正在使用的故障转移URI集。

有关更多信息,请参见Qpid JMS客户端configuration page