如果连接未重新启动,则 MessageListener 仅读取一条消息

问题描述

我在阅读 IBM MQ (c# + IBM.xms + ibmcom/mq:latest) 上的第二条消息时遇到问题。 使用认队列“DEV.QUEUE.1”,它只会在第一条消息到达后连接停止并再次启动时继续监听消息(conn.Stop()/conn.Start())。如果侦听器运行时消息已经在队列中,则所有消息都被立即消费。

我用的是 Docker,这是 MQ 版本信息:(尝试了多个旧版本)

bash-4.4$ dspmqver
Name:        IBM MQ
Version:     9.2.2.0
Level:       p922-L210310.DE
BuildType:   IKAP - (Production)
Platform:    IBM MQ for Linux (x86-64 platform)  
Mode:        64-bit
O/S:         Linux 4.19.128-microsoft-standard   
O/S Details: Red Hat Enterprise Linux 8.3 (Ootpa)
InstName:    Installation1
InstDesc:    IBM MQ V9.2.2.0 (Unzipped)
Primary:     N/A
InstPath:    /opt/mqm
DataPath:    /mnt/mqm/data
MaxCmdLevel: 922
LicenseType: Developer

使用的代码

using IBM.xms;
... 
        public IConnection conn;
        static void Main(string[] args)
        {
            Program app = new Program();
            app.Setup();
            Console.ReadLine();

        }

        public void Setup()
        {
            xmsFactoryFactory xff = xmsFactoryFactory.GetInstance(xmsC.CT_WMQ);
            IConnectionFactory cf = xff.CreateConnectionFactory();
            cf.SetStringProperty(xmsC.WMQ_HOST_NAME,"localhost");
            cf.SetIntProperty(xmsC.WMQ_PORT,1414);// 9443);
            cf.SetStringProperty(xmsC.WMQ_CHANNEL,"DEV.ADMIN.SVRCONN");
            cf.SetIntProperty(xmsC.WMQ_CONNECTION_MODE,xmsC.WMQ_CM_CLIENT);
            cf.SetStringProperty(xmsC.WMQ_QUEUE_MANAGER,"QM1");
            cf.SetIntProperty(xmsC.WMQ_broKER_VERSION,xmsC.WMQ_broKER_V1);
            cf.SetStringProperty(xmsC.USERID,"admin");
            cf.SetStringProperty(xmsC.PASSWORD,"passw0rd"); 

            conn = cf.CreateConnection();
            Console.WriteLine("connection created");
            ISession sess = conn.CreateSession(false,AckNowledgeMode.AutoAckNowledge);
            IDestination dest = sess.CreateQueue("DEV.QUEUE.1");
            IMessageConsumer consumer = sess.CreateConsumer(dest);
            MessageListener ml = new MessageListener(OnMessage);
            consumer.MessageListener = ml;
            conn.Start();
            Console.WriteLine("Consumer started"); 
        }

        private void OnMessage(IMessage msg)
        {
            ITextMessage textMsg = (ITextMessage)msg;
            Console.WriteLine("Got a message: " + textMsg.Text); 
            conn.Stop();  // MUST BE CHANGED - for some reason,new messages are not being updated,so connection needs to be restarted
            conn.Start();
        }

谢谢

解决方法

这就是我的做法。您需要询问下一条消息,完成后将其从 MQ 中删除。

 public string ConnectMQ(string strQueueManagerName,string strQueueName,string strChannelInfo)
    {

        //

        QueueManagerName = strQueueManagerName;

        QueueName = strQueueName;

        ChannelInfo = strChannelInfo;

        //

        char[] separator = { '/' };

        string[] ChannelParams;

        ChannelParams = ChannelInfo.Split(separator);

        channelName = ChannelParams[0];
    
        transportType = ChannelParams[1];

        connectionName = ChannelParams[2];
     //   connectionName = "";

        String strReturn = "";

        try
        {

            queueManager = new MQQueueManager(QueueManagerName,channelName,connectionName);

            

            strReturn = "Connected Successfully";

        }



 public String BrowseMsg(String position,MQQueue queue)
    {
        String strReturn = "";

        try
        {

        
            queueMessage = new MQMessage();

            queueMessage.Format = MQC.MQFMT_STRING;

            queueGetMessageOptions = new MQGetMessageOptions();
            if (position.Equals("First"))
            {
                queueGetMessageOptions.Options = IBM.WMQ.MQC.MQGMO_BROWSE_FIRST;
            }

            if (position.Equals("Next"))
            {
                queueGetMessageOptions.Options = IBM.WMQ.MQC.MQGMO_BROWSE_NEXT;
                
            }

            if (position.Equals("Remove"))
            {
                queueGetMessageOptions.Options = IBM.WMQ.MQC.MQGMO_MSG_UNDER_CURSOR;
            }

            queue.Get(queueMessage,queueGetMessageOptions);

            strReturn = queueMessage.ReadString(queueMessage.MessageLength);

            queueMessage.MessageId = MQC.MQMI_NONE;
            queueMessage.CorrelationId = MQC.MQCI_NONE;
            // Optional,remove data from the message
            queueMessage.ClearMessage();

           

        }





 public void init()
    {

        try
        {

         
         

            QueueManagerName = ConfigurationManager.AppSettings["QueueManagerName"].ToString();
            QueueName = ConfigurationManager.AppSettings["QueueName"].ToString();
            ChannelInfo = ConfigurationManager.AppSettings["ChannelInfo"].ToString();

            String strConnectionSuccess = String.Empty;
            strConnectionSuccess = "false";
            String strMQResponse = String.Empty;
            XmlDocument xmlMQRequestResponse = new XmlDocument();

            strConnectionSuccess = ConnectMQ(QueueManagerName,QueueName,ChannelInfo);

          
           

            if (!strConnectionSuccess.Equals("Connected Successfully"))
            {
                return;
            }

          
            int count = 0;
            queue = queueManager.AccessQueue(QueueName,MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_BROWSE + MQC.MQOO_FAIL_IF_QUIESCING);

            if (strMQResponse.Equals("MQRC_GET_INHIBITED") || strMQResponse.Equals("Exception : MQRC_GET_INHIBITED"))
            {
               
                log4net.LogManager.GetLogger("RollingFileAppender").Debug(strMQResponse);
            }

            while (!strMQResponse.Equals("Exception : MQRC_NO_MSG_AVAILABLE") && !strMQResponse.Equals("MQRC_GET_INHIBITED") && !strMQResponse.Equals("Exception : MQRC_GET_INHIBITED")) 
           
            {

              

                if (count == 0)
                {
                    strMQResponse = BrowseMsg("First",queue);

                }
                else
                {
                    strMQResponse = BrowseMsg("Next",queue);
                }

                if (strMQResponse.Equals("Exception : MQRC_NO_MSG_AVAILABLE"))
                {
                    continue;
                }

                Boolean boosuccess = new Boolean();
               // if success then do something with the data here


             

                if (boosuccess.Equals(true))
                {
                    BrowseMsg("Remove",queue);
                }

                count++;

            }
          

        }