问题描述
使用Qpid客户端连接Apache Artemis代理以实现高可用性。
代理实例在两个节点中运行,并且配置在broker.xml中列出了复制内容
brokers实例在node1(主节点)和node2(从属节点)上启动,并且正在运行,没有任何问题。
骆驼qpid jms客户端在执行骆驼客户端时使用URL配置为failover:(amqp://localhost:5672,amqp://localhost:5673)
,并且上下文无任何问题地启动,并且还注意到在broker UI Console中列为AMQP协议的连接。 [下面提供了配置详细信息]
通过以下配置,一切正常。
为了测试高可用性,我在node1上停止了代理实例,并期望Qpid Camel客户端自动发现node2代理并处理消息。但是它没有按预期连接。
但是,当我使用带有包括tcp方案连接的URL的aretmis-jms骆驼客户端时,我能够成功验证高可用性,其中由于某种原因,由于某些原因,客户端在节点2上自动发现代理时,停止了在node1中运行的代理被停止。同样,当node1启动时,备份客户端会自动连接到node1。
Qpid客户端无法发现备份代理。以下配置中的任何问题
master:有关详细配置,请参见link
<ha-policy>
<replication>
<master>
<check-for-live-server>true</check-for-live-server>
</master>
</replication>
</ha-policy>
从属:
<ha-policy>
<replication>
<slave>
<allow-failback>true</allow-failback>
</slave>
</replication>
</ha-policy>
客户使用骆驼
<bean id="jmsampqConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
<property name="remoteURI" value="failover:(ampq://localhost:5672,ampq://localhost:5673)" />
<property name="username" value="user"/>
<property name="password" value="pass"/>
</bean>
<bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
<property name="maxConnections" value="5" />
<property name="connectionFactory" ref="jmsamqpConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsPooledConnectionFactory" />
<property name="concurrentConsumers" value="5" />
</bean>
<bean id="jms" class="org.apache.camel.component.amqp.AMQPComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<bean id="CustomBean1" class="org.specific.process.class" />
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="jms:queue:enterprise1.queue" />
<convertBodyTo type="java.lang.String" />
<bean ref="CustomBean1" method="processCamelExchangeData" />
</route>
</camelContext>
maven依赖
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-amqp-starter</artifactId>
<version>2.23.0</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.54.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-amqp</artifactId>
<version>2.23.0</version>
</dependency>
日志:
2020-09-17 20:15:18,684: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:18,690: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@3962ec84
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@147e0734
2020-09-17 20:15:18,693: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@2bdab835
2020-09-17 20:15:18,694: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@7b8aebd0
2020-09-17 20:15:18,695: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker@55222ee9
2020-09-17 20:15:18,743: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://localhost:5672 in-progress
2020-09-17 20:15:19,102: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (SaslMechanismFinder.java:106) - Best match for SASL auth was: SASL-PLAIN
2020-09-17 20:15:19,119: FailoverProvider: async work thread DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConnectionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1,configuredURI = failover:(amqp://localhost:5672,amqp://localhost:5673),connectedURI = null } (1)
2020-09-17 20:15:19,162: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 } is Now open:
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:884) - Processing alternates uris:URI Pool { [amqp://localhost:5673,amqp://localhost:5672] } with new set: [amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:899) - Replacing uris:URI Pool { [amqp://localhost:5673,amqp://localhost:5672] } with new set: [amqp://localhost:5672,amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:913) - Processing alternates done new uris:URI Pool { [amqp://localhost:5672,amqp://localhost:61617?amqp.vhost=localhost] }
2020-09-17 20:15:19,165: AmqpProvider :(1):[amqp://localhost:5672] INFO (JmsConnection.java:1339) - Connection ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 connected to server: amqp://localhost:5672
2020-09-17 20:15:19,168: main DEBUG (JmsConsumer.java:106) - Started listener container org.apache.camel.component.jms.DefaultJmsMessageListenerContainer@5e9bf744 on destination enterprise1.queue
2020-09-17 20:15:19,168: main INFO (DefaultCamelContext.java:4013) - Route: route1 started and consuming from: jms://queue:enterprise1.queue
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3989) - Route: route2 >>> EventDrivenConsumerRoute[jmsTopic://topic:enterprise2.topic -> Pipeline[[Channel[convertBodyTo[java.lang.String]],Channel[org.apache.camel.component.bean.BeanProcessor@69d103f0]]]]
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1001) on route: route2
2020-09-17 20:15:19,216: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1 } (2)
2020-09-17 20:15:19,216: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2 } (3)
2020-09-17 20:15:19,216: Camel (camel) thread #6 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:3 } (4)
2020-09-17 20:15:19,217: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4 } (5)
2020-09-17 20:15:19,238: Camel (camel) thread #8 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:5 } (6)
2020-09-17 20:15:19,361: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1,destination = enterprise1.queue } (7)
2020-09-17 20:15:19,362: main DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=consumers,name=JmsConsumer(0x1132baa3)
2020-09-17 20:15:19,362: main DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[jmsTopic://topic:enterprise2.topic]
2020-09-17 20:15:19,364: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:19,365: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
...
2020-09-17 20:15:22,486: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: start -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4:1,destination = enterprise1.queue } (40)
2020-09-17 20:15:22,490: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2:1 (41)
2020-09-17 20:15:22,491: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1 (42)
...
解决方法
客户端正在从与其连接的Artemis服务器接收到已知代理URI的更新,该服务器似乎仅知道运行的服务器具有公布的端口61617。这导致客户端用代理返回的新集替换URI,如日志中所示:
Replacing uris:URI Pool { [amqp://localhost:5673,amqp://localhost:5672] } with new set: [amqp://localhost:5672,amqp://localhost:61617?amqp.vhost=localhost]
因此,如果重新连接到的适当服务器是端口5673上的原始备用服务器,则它将永远不会尝试该服务器,因为第一个代理告诉它唯一已知的代理将其发送到该备用服务器。通过将下面的故障转移URI选项设置为 ADD 或 IGNORE ,可以更改客户端行为以不替换其已知主机的原始配置。
failover.amqpOpenServerListAction 控制当来自远程对等方的连接Open框架向客户端提供故障转移主机列表时,故障转移传输的行为。该选项接受以下三个值之一:替换,添加或忽略(默认为替换)。如果配置了REPLACE,则将当前服务器以外的所有故障转移URI替换为远程对等方提供的URI。如果配置了ADD,则将远程服务器提供的URI通过重复数据删除添加到现有的故障转移URI集。如果配置了IGNORE,则将删除来自远程的任何更新,并且不会更改正在使用的故障转移URI集。
有关更多信息,请参见Qpid JMS客户端configuration page。