问题描述
我们通过提供createDurableSubscriber
和订户名称,以编程方式创建了clientId
的IBM MQ AMQP TOPIC订户。
我们启动了程序,因此它订阅了TOPIC并停止了该程序。然后将消息发送到主题,然后再次启动接收器程序,但是我们无法接收发送的消息并释放消息,这对于持久订阅是不应该发生的。
当使用mqsc命令disPLAY TOPIC
,disPLAY TPSTATUS
,disPLAY TPSTATUS SUB
,disPLAY SUB SUBID
连接订户时,但未看到订户程序停止时,我们可以看到amqp主题及其持久订阅。我们已经定义了属性DEFPSIST(YES)
,并且客户端(生产者到主题)正在发送持久消息。
消息消失了,因为我们无法在订阅者的持久队列中看到消息?是否取决于到期属性?
AMQ8096: WebSphere MQ subscription inquired.
SUBID("hex sub id")
SUB(:private:CLINET01:TOPIC01) TOPICSTR(TOPIC01)
TOPICOBJ(SYstem.BASE.TOPIC) disTYPE(RESOLVED)
DEST(SYstem.MANAGED.DURABLE.5F6B5C2524FB9AED)
DESTQMGR(qm.name) PUBAPPID( )
SELECTOR( ) SELTYPE(NONE)
USERDATA(010)
PUBACCT(***************************************************)
DESTCORL(***************************************************)
DESTCLAS(MANAGED) DURABLE(YES)
EXPIRY(0) PSPROP(MSGPROP)
PUBPRTY(ASPUB) REQONLY(NO)
SUBScopE(ALL) SUBLEVEL(1)
SUBTYPE(API) VARUSER(FIXED)
WSCHEMA(TOPIC) SUBUSER(mqm)
CRDATE(2020-09-28) CRTIME(04:14:09)
ALTDATE(2020-09-28) ALTTIME(04:14:09)
订户ID具有私有(不确定原因)和客户端ID,但没有订户名称Sub4。
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.String;
import javax.jms.Destination;
import javax.naming.Context;
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.DeliveryMode;
import javax.naming.InitialContext;
import javax.jms.Message;
public class AMQPQueueExample1 implements Runnable {
private static final int DELIVERY_MODE = DeliveryMode.PERSISTENT;
public void run(){
try{
Connection connection = null;
Context context = new InitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myFactoryLookup");
connection = connectionFactory.createConnection();
connection.setClientID("123");//("WHATS_MY_PURPOSE3"); // Why do we need clientID while publishing the TOPIC from consumer / publisher
Session session = connection.createSession(false,Session.AUTO_ACKNowLEDGE);
Topic pricetopic = (Topic) context.lookup("myTopicLookup1");
MessageConsumer subscriber1 = session.createDurableSubscriber(pricetopic,"sub420"); //"sub3");
System.out.println("TOPIC "+pricetopic);
connection.start();
while(true){
TextMessage message1 = (TextMessage) subscriber1.receive(1000);
if(message1!=null)
System.out.println("Subscriber 1 received : " + message1.getText());
}
}catch(Exception e){
e.printstacktrace();
}
}
public static void main(String[] args) {
AMQPQueueExample1 amp=new AMQPQueueExample1();
Thread thread = new Thread(amp);
thread.start();
}
}
值从jndi.properties文件中获取上下文工厂和提供者URL。
解决方法
从注释中看起来好像您正在使用MQ 8.0.0.5?如果是这种情况,则该版本的MQ不支持Apache Qpid JMS客户端。我相信使用该版本可以实现非常基本的非持久订阅 ,但是其他任何JMS方法都不太可能。
我怀疑那个版本的MQ无法完全理解Qpid JMS的AMQP 1.0流,因此订阅的有效期设置为0,而不是无限。
MQ 9.2增加了对更多JMS 2.0规范的支持-尽管不是每个JMS功能。这里提供了有关所支持方法的更多信息:
https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.2.0/com.ibm.mq.dev.doc/q125050_.htm
创建持久的订户和/或消费者应该可以按预期工作。
,马修·怀特海德(Matthew Whitehead)“ MQ Light messaging from Microsoft®.NET™ (Part 4)”的一篇文章指出:
AMQP频道不支持为MQ Light订阅设置无限的到期时间。虽然可以创建具有很长生存期的订阅,但无法创建永久存在的订阅。
如果您希望创建永不过期的订阅,则可以通过创建MQ管理的订阅并让MQ Light客户端加入并退出订阅来实现。这也有助于确保在第一个订阅者连接之前发布到某个主题的任何消息都不会完全丢失。阅读my previous article,了解如何将MQ Light客户端加入托管订阅。
AMQP相关字段
为提供上述到期功能,MQ Light使用AMQP 1.0的2个功能:
- 源超时
- 源有效期政策
源超时用于指定订阅将过期的时间(以秒为单位)。
源到期策略用于确定导致到期计时器开始的原因。 MQ AMQP通道仅支持链接分离的过期策略,这意味着计时器将在最后一个链接从预订分离后立即启动。
我搜索了有关如何在Apache QPID中设置“源超时”或“源过期策略”的参考,但是链接的博客参考通过管理定义的订阅来设置过期。根据您问题中的信息,我认为您可以提前定义类似的内容。我没有指定EXPIRY
,因为这将从EXPIRY(UNLIMITED)
中提取SYSTEM.DEFAULT.SUB
:
DEFINE SUB(':private:CLINET01:TOPIC01') TOPICOBJ(SYSTEM.BASE.TOPIC) TOPICSTR('TOPIC01') DESTCLAS(MANAGED)
然后,当您连接AMQP订户时,它将恢复该现有的订户,并将到期时间设置为UNLIMITED
。