ZMQ PUB / SUB套接字在低负载下具有高延迟

问题描述

我正在使用NetMQ测试ZMQ PUB / SUB套接字的性能。

在下面的测试程序中,SUB套接字用作服务器来接收消息,而PUB套接字用作客户端来接收消息。消息的主题只是一个序列号,消息的内容是PUB客户端发送消息的时间。 SUB服务器通过比较接收消息的时间和消息内部的时间(消息发送时间)来测量延迟。

对于在大约1秒钟内发送的1,000条消息,我发现的性能结果是平均延迟〜4ms和最大延迟〜40ms。但是,基于http://wiki.zeromq.org/results:more-precise-0mq-tests,ZMQ的等待时间应为微秒级,比我测得的等待时间要短得多。

我是否错误地使用ZMQ PUB / SUB插座?有什么办法可以减少延迟?任何想法都会受到欢迎和赞赏。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;

namespace NetMQ_Hello
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length == 0)
            {
                Console.WriteLine("Please use \"server\" or \"client\" as the first argument.");
                return;
            }

            string mode = args[0];
            if (mode == "server")
            {
                RunSubAsServer();
            }
            else if (mode == "client")
            {
                RunPubAsClient();
            }
        }

        static void RunPubAsClient()
        {
            string address = "tcp://localhost:1234";
            PublisherSocket pubSocket = new PublisherSocket();
            pubSocket.Connect(address);
            Thread.Sleep(10);

            Console.WriteLine("Starting to send messages...");
            DateTime startTime = DateTime.UtcNow;
            for (int i = 0; i < 1000; i++)
            {
                long data = DateTime.UtcNow.ToBinary();

                byte[][] frames = new byte[2][] { BitConverter.GetBytes(i),BitConverter.GetBytes(data) };
                pubSocket.SendMultipartBytes(frames);
                Console.WriteLine(string.Format("Sent: {0},{1}",i,data));
            }
            DateTime endTime = DateTime.UtcNow;
            Console.WriteLine(string.Format("1,000 messages sent in {0}",endTime - startTime));

            Console.WriteLine("Press enter to exit...");
            Console.ReadLine();

            pubSocket.Disconnect(address);
        }

        static void RunSubAsServer()
        {
            string address = "tcp://*:1234";
            SubscriberSocket subSocket = new SubscriberSocket();
            subSocket.Bind(address);
            subSocket.SubscribeToAnyTopic();
            Thread.Sleep(10);

            // Stats
            List<TimeSpan> latencies = new List<TimeSpan>();

            Console.WriteLine("Starting to receive messages...");
            List<byte[]> frames = new List<byte[]>();
            while (true)
            {

                if (subSocket.TryReceiveMultipartBytes(
                    timeout: TimeSpan.FromSeconds(1),frames: ref frames,expectedFrameCount: 2))
                {
                    DateTime now = DateTime.UtcNow;

                    int topic = BitConverter.ToInt32(frames[0],0);
                    DateTime sentTime = DateTime.FromBinary(BitConverter.ToInt64(frames[1],0));
                    TimeSpan latency = now - sentTime;
                    Console.WriteLine(string.Format("Received: {0},{1},delay {2}",topic,sentTime,latency));
                    latencies.Add(latency);

                    if (topic == 1000 - 1)
                    {
                        break;
                    }
                }

            }

            int n = latencies.Count;
            double max = latencies.Max().TotalMilliseconds;
            double mean = latencies.Sum(s => s.TotalMilliseconds) / n;

            Console.WriteLine(String.Format("Latency\nMax: {0}ms\nMean: {1}ms",max,mean));

            Console.WriteLine("Press enter to exit...");
            Console.ReadLine();

            subSocket.Unbind(address);
        }
    }
}

子服务器的示例输出:

Received: 990,9/30/2020 12:53:11 PM,delay 00:00:00.0010009
Received: 991,delay 00:00:00
Received: 992,delay 00:00:00.0019993
Received: 993,delay 00:00:00.0010016
Received: 994,delay 00:00:00
Received: 995,delay 00:00:00.0019993
Received: 996,delay 00:00:00.0010007
Received: 997,delay 00:00:00
Received: 998,delay 00:00:00
Received: 999,delay 00:00:00.0030002
Latency
Max: 39ms
Mean: 4.49101730769231ms

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)