Qpid(JMS) - Azure Servicebus - 需要会话的实体无法创建非会话消息接收器

问题描述

我们使用 qpid-jms-client-0.57.0 发布和接收来自 Azure ServiceBus 的消息。 ServiceBus 提供从会话接收消息以维护消息顺序的功能。 请参阅此处了解更多详情 - https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions

我可以使用 JMXGroupId 发布消息,但无法从启用会话的队列接收消息。 出现错误 - javax.jms.JMSException: 需要会话的实体无法创建非会话消息接收器。 TrackingId:*,SystemTracker:mule-intr-sbus-test-standard :Queue:test-order,Timestamp:2021-07-28T11:07:49 TrackingId:**,SystemTracker:gateway7,Timestamp:2021-07-28T11:07:49 [condition = amqp:not-allowed]

您能否建议从启用会话的队列中接收消息?

示例代码

public void receiveMessage() throws Exception {
        System.out.println("** Receiver start **");

        Connection connection = createConnection();
        connection.start();

        Session session = connection.createSession(false,Session.CLIENT_ACKNowLEDGE);

        Destination destination = session.createQueue(QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(destination);
        
        messageConsumer.setMessageListener(this);
        
        System.out.println("** Receiver registered **");
    }

错误堆栈跟踪

Exception in thread "main" javax.jms.JMSException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:****,SystemTracker:****:Queue:test-order,Timestamp:2021-07-28T10:30:03 TrackingId:****,Timestamp:2021-07-28T10:30:03 [condition = amqp:not-allowed]
    at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
    at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
    at org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:125)
    at org.apache.qpid.jms.JmsMessageConsumer.<init>(JmsMessageConsumer.java:82)
    at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:479)
    at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:467)
    at org.apache.qpid.jms.JmsSession.createConsumer(JmsSession.java:459)
    at com.qpid.test.TestSessionEnable.receiveMessage(TestSessionEnable.java:70)
    at com.qpid.test.TestSessionEnable.main(TestSessionEnable.java:80)
Caused by: org.apache.qpid.jms.provider.ProviderException: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:****,Timestamp:2021-07-28T10:30:03 [condition = amqp:not-allowed]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getopenAbortExceptionFromremote(AmqpResourceBuilder.java:299)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:185)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:129)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:985)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:871)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
    at io.netty.handler.codec.BytetoMessageDecoder.decodeRemovalReentryProtection(BytetoMessageDecoder.java:508)
    at io.netty.handler.codec.BytetoMessageDecoder.callDecode(BytetoMessageDecoder.java:447)
    at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748)

解决方法

您已经问过这个问题并且在 Qpid 消息列表中有一个答案,所以我想您可能对答案不满意,但给出的答案基本上涵盖了问题。 Microsoft 客户端使用了非法过滤器值,该值不是 AMQP 描述的类型,因此即使客户端支持以某种方式设置自定义过滤器(它不支持),它也不允许使用此值。

此外,客户端不会向用户公开此级别的内部实现,因此您无法向链接附加中添加任何过滤器以利用 Microsoft 通过此非法过滤器定义启用的任何机制。