多线程:发布者线程和订阅者线程之间的混淆

问题描述

我创建了一个Windows窗体应用程序,该应用程序能够将OPC标记发布到MQTT代理中,现在,我正尝试进行相反的操作,将MQTT标记写入OPC服务器。当我同时启动了代理(发布者)和转移(订阅者)时,这两个线程执行与发布相同的工作,我不知道这是什么问题。 启动方法如下:

public byte Start()
    {
        try
        {
            byte connectResult;
            if (IsLWT == false)
            {


                connectResult = this._MqttClient.Connect(ClientID,Username,Password,IsCleanSession,KeepAlivePeriode);
            }
            else
            {             
                   
                connectResult = this._MqttClient.Connect(ClientID,willRetain,willQos,true,willTopic,willMessage,KeepAlivePeriode);

            }

            // 0 means that the connection suceeded
            if (connectResult == 0)
            {
                this.Rate = GetRateFromOPCGroups();
                this._publisherThread = new Thread(() => Publish());
                this._publisherThread.IsBackground = true;
                this._publisherThread.Start();
                Isstarted = true;
            }

            if (connectResult == 0)
            {
              //this.Rate = GetRateFromOPCGroups();
                this._SubscriberThread = new Thread(() => Subscribe(topics));
                this._SubscriberThread.IsBackground = true;
                this._SubscriberThread.Start();
                Isstarted = true;
            }

            return connectResult;

        }
        catch (IntegrationObjects.Networking.M2Mqtt.Exceptions.MqttClientException ex)
        {
            MQTTServiceLogger.TraceLog(MessageType.Error,MQTTServiceMessages.startAgentFailed(this.Name,ex.Message));
            return 11;
        }
        
        catch (IntegrationObjects.Networking.M2Mqtt.Exceptions.MqttCommunicationException ex)
        {
            MQTTServiceLogger.TraceLog(MessageType.Error,ex.Message));
            return 11;
        }
        catch (Exception ex)
        {
            MQTTServiceLogger.TraceLog(MessageType.Error,ex.Message));
            return 11;
        }
    }

1。这是发布代码: 私有无效Publish() {

        while (true)
        {
            if (Isstarted)
            {
                try
                {
                    if (_MqttClient.IsConnected)
                    {
                        isConnected = true;
                        if (this.OPCItems.Count != 0)
                        {

                            JsonMQTTMessage JsonMessage = new JsonMQTTMessage();
                            JsonMessage.Timestamp = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss.fff");
                            JsonMessage.listofValues = new List<UpdatedOPCItem>();

                          
                        
                            lock (lockOPCItems)
                            {
                                foreach (OPCItem Item in OPCItems.ToList())
                                {
                                    if (Item != null)
                                    {
                                        UpdatedOPCItem upItem = new UpdatedOPCItem();
                                        upItem.ID = Item.ItemID;
                                        upItem.value = Item.ItemCurrentValue;
                                        upItem.quality = Item.ItemQuality;
                                        upItem.timestamp = Item.ItemTimeStamp.ToString("yyyy/MM/dd HH:mm:ss.fff");
                                        upItem.DataType = Item.ItemDataType;
                                        JsonMessage.listofValues.Add(upItem);
                                    }

                                }
                            }
                           
                            var messagetopublish = Newtonsoft.Json.JsonConvert.SerializeObject(JsonMessage);
                            ushort res = _MqttClient.Publish(Topic,Encoding.UTF8.GetBytes(messagetopublish),Qos,Retain);
                            ResetValues();
                            Thread.Sleep(Rate);
  1. 这是订阅代码

public void订阅(列出主题) {

        while (true)
        {
            if (Isstarted)
            {
                try
                {
                    if (_MqttClient.IsConnected)
                    {
                        isConnected = true;
                        foreach (string topic in topics)
                        {
                            ushort msggId = _MqttClient.Subscribe(new string[] { $"{ topic }" },new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
                        }
                        Thread.Sleep(Rate);

                    }
                    else
                   

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)