问题描述
使用在4 x节点群集中设置的Artemis 2.14,消息重新分发没有按我预期的方式工作-寻找一些帮助来阐明它的行为,即我的配置是否错误或我是否只是在期待系统做一些不做的事!
Artemis集群充当服务多个应用程序的中央消息传递中心。集群中的所有节点均配置相同。各种客户端应用程序都是消费者,生产者或两者,并且通常也可以适当地集群和扩展。
我遇到的问题情况的一个示例是一个消费者应用程序,该应用程序只有两个节点,每个节点运行一个消费者线程,因此,它使用的Artemis队列上只有2个消费者-即4个artemis中的2个节点(最多)将有消费者。生产者应用将消息发送到队列,由于各种原因,可能会出现这样的情况,即消息最终到达没有使用方的节点上。因为生产者客户端负载似乎不“喜欢”消费者的节点(可能会问一个单独的问题!),或者可能是因为消费者应用程序可能由于维护而停机,但生产者应用程序仍在运行并发送消息。我们为队列配置了“ redistribution-delay”(值为600000),并且我们希望这些消息在此时间后将自动移动到其他具有使用方的节点之一,但是似乎没有发生
回过头看文档,我意识到它说:“ ...在最后一个使用者关闭后,毫秒后延迟,然后重新分发消息。”。这是否意味着如果一个特定节点上没有任何使用者(我猜是自从上次重启以来),那么到达该节点的消息将永远不会重新分配?如果有,那么如何处理这种情况的建议?
我的broker.xml(在下面进行了修改,以简化并匿名化)
谢谢!
<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>${ARTEMIS_HOSTNAME}</name>
<metrics-plugin class-name="org.apache.activemq.artemis.core.server.metrics.plugins.ArtemisPrometheusMetricsPlugin"/>
<persistence-enabled>true</persistence-enabled>
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>644000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<connectors>
<!-- Connector used to be announced through cluster connections and notifications -->
<connector name="artemis1-${ENV}-connector">tcp://artemis1-${ENV}:61616</connector>
<connector name="artemis2-${ENV}-connector">tcp://artemis2-${ENV}:61616</connector>
<connector name="artemis3-${ENV}-connector">tcp://artemis3-${ENV}:61616</connector>
<connector name="artemis4-${ENV}-connector">tcp://artemis4-${ENV}:61616</connector>
</connectors>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>644000</page-sync-timeout>
<acceptors>
<acceptor name="artemis-clients">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>
<cluster-user>cluster-user</cluster-user>
<cluster-password>XXXXXXXXXXXX</cluster-password>
<cluster-connections>
<cluster-connection name="artemis-cluster-${ENV}">
<address></address>
<connector-ref>${ARTEMIS_HOSTNAME}-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors allow-direct-connections-only="true">
<connector-ref>artemis1-${ENV}-connector</connector-ref>
<connector-ref>artemis2-${ENV}-connector</connector-ref>
<connector-ref>artemis3-${ENV}-connector</connector-ref>
<connector-ref>artemis4-${ENV}-connector</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<address-settings>
<!-- if you define auto-create on certain queues,management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<config-delete-addresses>FORCE</config-delete-addresses>
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
<address-setting match="my.organisation.#"> <!-- standard settings for all queues -->
<!-- error queues automatically created based on these params -->
<dead-letter-address>ERROR_MESSAGES</dead-letter-address>
<auto-create-expiry-resources>true</auto-create-expiry-resources>
<auto-create-dead-letter-resources>true</auto-create-dead-letter-resources>
<dead-letter-queue-prefix></dead-letter-queue-prefix> <!-- override the default -->
<dead-letter-queue-suffix>_error</dead-letter-queue-suffix>
<!-- redelivery & redistribution settings -->
<redelivery-delay>600000</redelivery-delay>
<max-delivery-attempts>9</max-delivery-attempts>
<redistribution-delay>600000</redistribution-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>false</auto-create-addresses>
<auto-create-jms-queues>false</auto-create-jms-queues>
<auto-create-jms-topics>false</auto-create-jms-topics>
<config-delete-addresses>FORCE</config-delete-addresses>
<config-delete-queues>FORCE</config-delete-queues>
</address-setting>
</address-settings>
<addresses>
<address name="my.organisation.app1.jms.queue"><anycast><queue name="my.organisation.app1.jms.queue" /></anycast></address>
<address name="my.organisation.app2.jms.queue.input"><anycast><queue name="my.organisation.app2.jms.queue.input" /></anycast></address>
<address name="my.organisation.app3.jms.queue.input"><anycast><queue name="my.organisation.app3.jms.queue.input" /></anycast></address>
</addresses>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
<security-setting match="my.organisation.app1.#">
<permission type="consume" roles="app1_role"/>
<permission type="browse" roles="app1_role"/>
<permission type="send" roles="app1_role"/>
</security-setting>
<security-setting match="my.organisation.app2.#">
<permission type="consume" roles="app2_role"/>
<permission type="browse" roles="app2_role"/>
<permission type="send" roles="app2_role"/>
</security-setting>
<security-setting match="my.organisation.app3.#">
<permission type="consume" roles="app3_role"/>
<permission type="browse" roles="app3_role"/>
<permission type="send" roles="app3_role"/>
</security-setting>
</security-settings>
</core>
</configuration>
解决方法
根据您的配置,如果将消息发送到没有使用方的节点,则应将其自动转发到具有使用方的节点。 The documentation将此称为“初始分布”。所谓的“重新分发”仅对在消费者在场时到达代理并随后断开连接的消息起作用。
如果您认为自己遇到了错误,请制定一个重现问题和open a JIRA的测试用例。