当我尝试处理轮询器时 NetMQ 卡住了请求-回复模式

问题描述

这是我第一个使用 NetMQ (ZMQ) 框架的项目,所以,可能我没有完全理解如何使用它。

我创建了一个包含两个应用程序的 Windows 窗体项目,一个向另一个发送“ping”并接收“pong”作为答案。该协议并不复杂,并使用请求-回复模式,所有命令都有一个标识目标的第一部分,如“查询”或“通知”,以及包含命令或消息本身的第二部分。在这种情况下,一个应用发送一个“query-ping”,另一个应用发送“inform-pong”。

我创建了一个类来封装所有的脏作业,所以主窗体可以非常简单地使用协议。一切正常,但是当我尝试关闭应用程序时,它卡在轮询器中并且应用程序永远不会关闭。在 Visual Studio 中,我可以看到暂停和停止按钮,但没有出现任何异常或错误:

enter image description here

当我单击暂停按钮时,我收到此消息(应用程序处于中断模式):

enter image description here

如果我点击“继续执行”,应用会回到相同的状态并且永远不会关闭。

如果我删除轮询器,应用程序会正常关闭,但当然,轮询器不起作用,应用程序也不再应答。

这是来自 Form1 的代码:

using CommomLib;
using System;
using System.Windows.Forms;

namespace Test1
{
    public partial class Form1 : Form
    {

        ZmqCommunication zmqComm = new ZmqCommunication();
        int portNumber;
        string status;

        public Form1()
        {
            InitializeComponent();
            
            InitializeZmqComm();
        }

        public void InitializeZmqComm()
        {
            // Calls the ZMQ initialization.
            portNumber = zmqComm.InitializeComm();
            if (portNumber == 0)
                status = "Ini error";
            else
                status = "Ini ok";
        }

        // Executes a ping command.
        private void button1_Click(object sender,EventArgs e)
        {
            richTextBox1.Clear();
            richTextBox1.AppendText(zmqComm.RequestPing(55001) + "\n");
        }


    }

}

这是来自我的 NetMQ 类的代码。它在一个单独的库项目中。在我的 Dispose 方法中,我尝试了 Remove、Dispose 和 StopAsync 的所有组合,但没有任何效果:

using NetMQ;
using NetMQ.Sockets;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace CommomLib
{
    public class ZmqCommunication
    {
        ResponseSocket serverComm = new ResponseSocket();
        NetMQPoller serverPoller;

        int _portNumber;

        public ZmqCommunication()
        {
            _portNumber = 55000;
        }

        // Problem here! The serverPoller gets stuck. 
        public void Dispose()
        {
            //serverPoller.RemoveAndDispose(serverComm);
            //serverComm.Dispose();

            if (serverPoller.IsDisposed)
                Debug.WriteLine("A");

            //serverPoller.RemoveAndDispose(serverComm);
            serverPoller.Remove(serverComm);
            //serverPoller.StopAsync();
            serverPoller.Dispose();

            serverComm.Dispose();

            if (serverPoller.IsDisposed)
                Debug.WriteLine("B");

            Thread.Sleep(500);

            if (serverPoller.IsDisposed)
                Debug.WriteLine("C");

            Thread.Sleep(500);

            if (serverPoller.IsDisposed)
                Debug.WriteLine("D");

        }

        // ZMQ initialization.
        public int InitializeComm()
        {
            bool ok = true;
            bool tryAgain = true;

            // Looks for a port.
            while (tryAgain && ok)
            {
                try
                {
                    serverComm.Bind("tcp://127.0.0.1:" + _portNumber);
                    tryAgain = false;
                }
                catch (NetMQ.AddressAlreadyInUseException)
                {
                    _portNumber++;
                    tryAgain = true;
                }
                catch
                {
                    ok = false;
                }
            }

            if (!ok)
                return 0;   // Error.


            // Set up the pooler.
            serverPoller = new NetMQPoller { serverComm };
            serverComm.ReceiveReady += (s,a) =>
                {
                    RequestInterpreter();
                };

            // start polling (on this thread)
            serverPoller.RunAsync();

            return _portNumber;
        }

        // Message interpreter.
        private void RequestInterpreter()
        {

            List<string> message = new List<string>();

            if (serverComm.TryReceiveMultipartStrings(ref message,2))
            {
                if (message[0].Contains("query"))
                {
                    // Received the command "ping" and answers with a "pong".
                    if (message[1].Contains("ping"))
                    {
                        serverComm.SendMoreFrame("inform").SendFrame("pong");
                    }

                }
            }

        }

        // Send the command "ping".
        public string RequestPing(int port)
        {

            using (var requester = new RequestSocket())
            {
                Debug.WriteLine("Running request port {0}",port);

                requester.Connect("tcp://127.0.0.1:" + port);

                List<string> msgResp = new List<string>();

                requester.SendMoreFrame("query").SendFrame("ping");

                if (requester.TryReceiveMultipartStrings(new TimeSpan(0,10),ref msgResp,2))
                {
                    if (msgResp[0].Contains("inform"))
                    {
                        return msgResp[1];
                    }
                }

            }

            return "error";

        }


    }
}

解决方法

你可以尝试调用 NetMQConfig.Cleanup();在窗口关闭事件中?

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...