在 Spring 上使用池化连接工厂和 Artemis ActiveMQ 故障转移来处理重新发送消息

问题描述

我使用池化连接工厂连接到 ActiveMQ Artemis 高可用性集群。 下面的代码显示了我当前的实现。

  @Bean
  public ConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl,username,password);;
        return connectionFactory;
  }

   @Bean
   public JmsPoolConnectionFactory pooledConnectionFactoryOnline() {
        JmsPoolConnectionFactory poolingFactory = new JmsPoolConnectionFactory();
        poolingFactory.setConnectionFactory(jmsConnectionFactory());
        poolingFactory.setMaxConnections(3);
        poolingFactory.setConnectionIdleTimeout(0);

        return poolingFactory;
    }

    @Bean
    public jmstemplate jmstemplateOnline() {
        jmstemplate jmstemplate = new jmstemplate();
        jmstemplate.setConnectionFactory(pooledConnectionFactoryOnline());
        jmstemplate.setDefaultDestinationName(QUEUE);
        return jmstemplate;
    }

上面连接工厂池的实现来自org.messaginghub.pooled.jms.JmsPoolConnectionFactory(不过我遇到了类似的问题org.springframework.jms.connection.CachingConnectionFactory) 用于故障转移情况的连接字符串是 (tcp://broker1:61616,tcp://broker2:62616)?ha=true&reconnectAttempts=-1

我对主代理的 ha 策略的配置也可以在下面看到

 <connectors>
    <connector name="broker1-connector">tcp://broker1:61616</connector>
    <connector name="broker2-connector">tcp://broker2:61616</connector>
 </connectors>

 <ha-policy>
    <replication>
      <master>
        <check-for-live-server>true</check-for-live-server>
      </master>
    </replication>
 </ha-policy>

 <cluster-connections>
    <cluster-connection name="myhost1-cluster">
      <connector-ref>broker1-connector</connector-ref>
      <retry-interval>500</retry-interval>
      <use-duplicate-detection>true</use-duplicate-detection>
      <static-connectors>
        <connector-ref>broker2-connector</connector-ref>
      </static-connectors>
    </cluster-connection>
 </cluster-connections>

分别用于从代理

<ha-policy>
   <replication>
      <slave>
          <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>

下面提供了主代理的日志


2021-01-24 21:05:56,093 INFO  [org.apache.activemq.artemis.core.server] AMQ221082: Initializing metrics plugin org.apache.activemq.artemis.core.server.metrics.plugins.ArtemisPrometheusMetricsPlugin with properties: {}
2021-01-24 21:05:56,266 INFO  [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
2021-01-24 21:05:56,288 INFO  [org.apache.activemq.artemis.core.server] AMQ221000: live Message broker is starting with configuration broker Configuration (clustered=true,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2021-01-24 21:05:58,987 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup,removing /var/lib/artemis/data/bindings/oldreplica.94
2021-01-24 21:05:58,994 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/bindings to /var/lib/artemis/data/bindings/oldreplica.96
2021-01-24 21:05:59,001 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup,removing /var/lib/artemis/data/journal/oldreplica.94
2021-01-24 21:05:59,058 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/journal to /var/lib/artemis/data/journal/oldreplica.96
2021-01-24 21:05:59,062 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup,removing /var/lib/artemis/data/paging/oldreplica.94
2021-01-24 21:05:59,068 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/paging to /var/lib/artemis/data/paging/oldreplica.96
2021-01-24 21:05:59,135 INFO  [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2021-01-24 21:05:59,140 WARN  [org.apache.activemq.artemis.core.server] AMQ222007: Security risk! Apache ActiveMQ Artemis is running with the default cluster admin user and default password. Please see the cluster chapter in the ActiveMQ Artemis User Guide for instructions on how to change this.
2021-01-24 21:05:59,149 INFO  [org.apache.activemq.artemis.core.server] AMQ221057: Global Max Size is being adjusted to 1/2 of the JVM max size (-Xmx). being defined as 16,089,350,144
2021-01-24 21:05:59,300 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2021-01-24 21:05:59,303 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2021-01-24 21:05:59,305 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: hornetq
2021-01-24 21:05:59,306 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2021-01-24 21:05:59,306 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2021-01-24 21:05:59,307 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2021-01-24 21:05:59,463 INFO  [org.apache.activemq.artemis.core.server] AMQ221109: Apache ActiveMQ Artemis Backup Server version 2.13.0 [null] started,waiting live to fail before it gets active
2021-01-24 21:05:59,555 INFO  [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2021-01-24 21:05:59,638 INFO  [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2021-01-24 21:06:00,447 INFO  [io.hawt.HawtioContextListener] Initialising hawtio services
2021-01-24 21:06:00,471 INFO  [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2021-01-24 21:06:00,474 INFO  [io.hawt.jmx.JmxTreeWatcher] Welcome to hawtio 1.5.12 : http://hawt.io/ : Don't cha wish your console was hawt like me? ;-)
2021-01-24 21:06:00,478 INFO  [io.hawt.jmx.UploadManager] Using file upload directory: /var/lib/artemis/tmp/uploads
2021-01-24 21:06:00,501 INFO  [io.hawt.web.AuthenticationFilter] Starting hawtio authentication filter,JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2021-01-24 21:06:00,535 INFO  [io.hawt.web.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation,value=file:/var/lib/artemis/etc/jolokia-access.xml]
2021-01-24 21:06:00,572 INFO  [io.hawt.web.RBACMBeanInvoker] Using MBean [hawtio:type=security,area=jmx,rank=0,name=HawtioDummyJMXSecurity] for role based access control
2021-01-24 21:06:00,824 INFO  [io.hawt.system.ProxyWhitelist] Initial proxy whitelist: [localhost,127.0.0.1,172.23.0.7,42546424839f]
2021-01-24 21:06:01,245 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://0.0.0.0:8161
2021-01-24 21:06:01,245 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://0.0.0.0:8161/console/jolokia
2021-01-24 21:06:01,245 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://0.0.0.0:8161/console
2021-01-24 21:06:03,263 INFO  [org.apache.activemq.artemis.core.server] AMQ221024: Backup server ActiveMQServerImpl::serverUUID=b96ecec9-e13e-11ea-8a4f-0242ac170006 is synchronized with live-server.
2021-01-24 21:06:09,763 INFO  [org.apache.activemq.artemis.core.server] AMQ221031: backup announced
2021-01-24 21:06:09,806 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to broker2/broker2:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=disCONNECTED]
2021-01-24 21:06:09,875 INFO  [org.apache.activemq.artemis.core.server] AMQ221037: ActiveMQServerImpl::serverUUID=b96ecec9-e13e-11ea-8a4f-0242ac170006 to become 'live'
2021-01-24 21:06:09,897 WARN  [org.apache.activemq.artemis.core.client] AMQ212004: Failed to connect to server.
2021-01-24 21:06:10,553 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: deploying address DLQ supporting [ANYCAST]
2021-01-24 21:06:10,554 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: deploying ANYCAST queue DLQ on address DLQ
2021-01-24 21:06:10,555 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: deploying address ExpiryQueue supporting [ANYCAST]
2021-01-24 21:06:10,555 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: deploying ANYCAST queue ExpiryQueue on address ExpiryQueue
2021-01-24 21:06:10,803 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is Now live
2021-01-24 21:06:10,865 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,hornetq,OPENWIRE]

对于从代理同样如下所示

2021-01-24 21:05:59,975 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/journal/activemq-data-1262.amq (size=10,485,760) to replica.
2021-01-24 21:06:01,346 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/journal/activemq-data-1261.amq (size=10,760) to replica.
2021-01-24 21:06:02,253 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/bindings/activemq-bindings-1191.bindings (size=1,048,576) to replica.
2021-01-24 21:06:02,363 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/bindings/activemq-bindings-1196.bindings (size=1,451 INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /var/lib/artemis/data/bindings/activemq-bindings-1189.bindings (size=1,576) to replica.
2021-01-24 21:06:09,756 INFO  [org.apache.activemq.artemis.core.server] AMQ224100: Timed out waiting for large messages deletion with IDs [],might not be deleted if broker crashes atm
2021-01-24 21:06:09,might not be deleted if broker crashes atm
2021-01-24 21:06:10,046 INFO  [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message broker version 2.13.0 [b96ecec9-e13e-11ea-8a4f-0242ac170006] stopped,uptime 6 hours 32 minutes
2021-01-24 21:06:10,046 INFO  [org.apache.activemq.artemis.core.server] AMQ221039: Restarting as Replicating backup server after live restart
2021-01-24 21:06:10,050 INFO  [org.apache.activemq.artemis.core.server] AMQ221000: backup Message broker is starting with configuration broker Configuration (clustered=true,pagingDirectory=data/paging)
2021-01-24 21:06:10,053 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup,removing /var/lib/artemis/data/bindings/oldreplica.101
2021-01-24 21:06:10,059 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/bindings to /var/lib/artemis/data/bindings/oldreplica.103
2021-01-24 21:06:10,060 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup,removing /var/lib/artemis/data/journal/oldreplica.101
2021-01-24 21:06:10,110 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/journal to /var/lib/artemis/data/journal/oldreplica.103
2021-01-24 21:06:10,111 INFO  [org.apache.activemq.artemis.core.server] AMQ221055: There were too many old replicated folders upon startup,removing /var/lib/artemis/data/paging/oldreplica.100
2021-01-24 21:06:10,117 INFO  [org.apache.activemq.artemis.core.server] AMQ222162: Moving data directory /var/lib/artemis/data/paging to /var/lib/artemis/data/paging/oldreplica.102
2021-01-24 21:06:10,120 INFO  [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2021-01-24 21:06:10,121 WARN  [org.apache.activemq.artemis.core.server] AMQ222007: Security risk! Apache ActiveMQ Artemis is running with the default cluster admin user and default password. Please see the cluster chapter in the ActiveMQ Artemis User Guide for instructions on how to change this.
2021-01-24 21:06:10,124 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2021-01-24 21:06:10,127 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2021-01-24 21:06:10,127 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: hornetq
2021-01-24 21:06:10,127 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2021-01-24 21:06:10,127 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2021-01-24 21:06:10,128 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2021-01-24 21:06:11,138 INFO  [org.apache.activemq.artemis.core.server] AMQ221109: Apache ActiveMQ Artemis Backup Server version 2.13.0 [null] started,waiting live to fail before it gets active
2021-01-24 21:06:14,559 INFO  [org.apache.activemq.artemis.core.server] AMQ221024: Backup server ActiveMQServerImpl::serverUUID=b96ecec9-e13e-11ea-8a4f-0242ac170006 is synchronized with live-server.
2021-01-24 21:06:14,594 INFO  [org.apache.activemq.artemis.core.server] AMQ221031: backup announced

当我尝试测试故障转移并停止主代理时,我可以看到我的客户端收到了我正在尝试的连接异常 处理以避免丢失任何消息。我使用 docker stop 停止 docker 容器(它首先通过发送 SIGTERM 信号并在超时时间后发送 SIGKILL 信号来停止正在运行的容器)。由于我知道所有流量都将重定向到从代理,因此我的方法如下:

  @Autowired
  JmsPoolConnectionFactory poolFactory;

  try {
        jmstemplateOnline.convertAndSend(QUEUE,message);
    }
    catch (JmsException e){

        try (Connection connection = poolFactory.createConnection();
            Session session = connection.createSession(false,Session.AUTO_ACKNowLEDGE);
            MessageProducer producer = session.createProducer(new ActiveMQQueue(QUEUE))) {

            producer.send(messageConverter.toMessage(message,session));

        } catch (Exception jmsException) {
            jmsException.printstacktrace();
        }
    }

基本上是因为来自池的连接失败了,根据我的理解,他们应该反弹并与从站建立新的连接 经纪人,因此获得新连接将发送我的消息。会发生什么

[Thread-4 (ActiveMQ-client-global-threads)] [WARN ] org.apache.activemq.artemis.core.client - AMQ212037: Connection failure to /broker1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=disCONNECTED]
[Thread-1 (ActiveMQ-client-global-threads)] [WARN ] org.apache.activemq.artemis.core.client - AMQ212037: Connection failure to /broker1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=disCONNECTED]
[Thread-2 (ActiveMQ-client-global-threads)] [WARN ] org.apache.activemq.artemis.core.client - AMQ212037: Connection failure to /broker1:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=disC
ONNECTED]

这是我尝试重新发送消息之前遇到的异常

[http-nio-8080-exec-1] [INFO ]  Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ219016: Connection failure detected. 
Unblocking a blocking call that will never get a response

现在虽然在某些测试中我能够发送我的消息,但在某些情况下我的消息发送失败,但出现以下异常

http-nio-8080-exec-1] [INFO ] 
javax.jms.IllegalStateException: AMQ219018: Producer is closed
        at org.apache.activemq.artemis.core.client.impl.ClientProducerImpl.checkClosed(ClientProducerImpl.java:301)
        at org.apache.activemq.artemis.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:123)
        at org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer.doSendx(ActiveMQMessageProducer.java:483)
        at org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:220)
        at org.messaginghub.pooled.jms.JmsPoolMessageProducer.sendMessage(JmsPoolMessageProducer.java:194)
        at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:88)
        at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:77)
...
Caused by: ActiveMQObjectClosedException[errorType=OBJECT_CLOSED message=AMQ219018: Producer is closed]
        ... 108 more
24-01-2021 16:07:27[http-nio-8080-exec-1] [WARN ] o.messaginghub.pooled.jms.JmsPoolSession - Caught exception trying close() when putting session back into the pool,will invalidate. javax.jms.IllegalStateException: Session is closed
javax.jms.IllegalStateException: Session is closed

我的主要问题是找到一种在故障转移过程中不丢失任何消息的方法。你能指出我做错了什么以及我如何以更好的方式处理这个案子吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)