在 C# 中生成时间戳时,Thread.Sleep 的延迟会导致问题

问题描述

由于一个项目,我正在生成一些合成数据,用于构建用于分析目的的基础设施。

基础设施是通过数据模拟建立起来的,用于生成合成数据。这些数据通过 UDP 发送到 node-red 服务器,然后通过 mqtt 移交给 kafka。

我只是生成一个带有时间戳的布尔值。现在我想分析这些时间戳之间的时间。数据应该在一段时间之间生成。所以这是一个例子:

Data A:
{
isActivated: false,Timestamp: "xxxxx"
}

Data B:
{
isActivated: true,Timestamp: "xxxxx+deltaTime"
}

所以B.Timestamp-A.Timestamp = deltaTime。到目前为止一切顺利,但是当我添加 Thread.Sleep(delay) 时,延迟将被添加到从 Kafka-Consumer 计算的 deltaTime ......(我在消息中获得了正确的时间戳进行计算,而不是那些产生来自 kafka 本身。我通过将 2 天添加到 DataGenerator 中生成的时间戳来测试它)

这里是代码示例:

public class CustomData
{
    public DateTime Timestamp { get; set; }
    public bool isActivated { get; set; }
}


public class DataGenerator 
{
    private bool DataAActivated { get; set; }
    private IPAddress ipAddress = IPAddress.Parse("XXX.XXX.XXX.XXX");
    private UdpClient udpClient = new UdpClient();
    private IPEndPoint iPEndPoint = new IPEndPoint(ipAddress,XXXXX);
    
    public DataGenerator (bool dataAActivated)
    {
         DataAActivated = dataAActivated;
    }

    public void GenerateData(double delay,int deltaTime) 
    {
        DateTime ts0 = DateTime.Now;
        DateTime ts2 = ts0.AddMilliseconds(deltaTime);
    
        if (DataAActivated) 
        {
            CustomData dataA = new CustomData();
            dataA.isActivated = false;
            dataA.Timestamp = ts0;

            CustomData dataB = new CustomData();
            dataB.isActivated = true;
            dataB.Timestamp = ts2;
        } 
        else
        {
            CustomData dataB = new CustomData();
            dataB.isActivated = false;
            dataB.Timestamp = ts0;

            CustomData dataA = new CustomData();
            dataA.isActivated = true;
            dataA.Timestamp = ts2;
        }
    
        // this is causing issues
        Thread.Sleep((int)delay);

        SendData(dataA);
        SendData(dataB);
    }
    
    private void SendData(CustomData data)
        {
            udpClient.Connect(iPEndPoint);

            byte[] jsonUtf8Bytes;
            var options = new JsonSerializerOptions
            {
                WriteIndented = true
            };
            jsonUtf8Bytes = System.Text.Json.
                              JsonSerializer.SerializetoUtf8Bytes(data,options);
            
            udpClient.Send(jsonUtf8Bytes,jsonUtf8Bytes.Length);
            udpClient.Close();
        }
}
 
---------------- within Kafka-Consumer ----------------
public double CalcDuration( CustomData dataA,CustomData dataB)
{
    double duration = dataB.Timestamp.Subtract(dataA.Timestamp).TotalMilliseconds;
    Console.WriteLine($"duration: {duration}");
    return duration;
}

如果 deltaTime 为 100 毫秒,延迟为 500 毫秒,输出将如下所示 不带 Thread.Sleep(delay):

duration: 100ms
duration: 100ms
...

使用 Thread.Sleep(延迟):

duration: 600ms
duration: 600ms
...

有人告诉我如何解决这个问题吗?

我希望我能说清楚,但不要犹豫,让我知道编辑这篇文章

非常感谢,

问候

解决方法

我发现了这个问题。正如彼得和玛丽安提到的,我在问题中发布的代码没有导致失败。

这让我开始了解其他服务的代码。通过清理整个kafka-cluster相关topic,重新开始生产,看到Producer/Consumer链的第二个服务我设置的初始状态错误。

所以第一条消息被跳过,因此时间戳之间的计算是延迟之前和之后的。那是因为我为持续时间的计算设置的约束来自相同的偏移量。

所以要说清楚: 发送到 kafka 的数据到达主题“摄取” 会有这样的消息:

ingestion:
(1)dataA
(2)dataB
(3)dataA
(4)dataB
...

从那以后,我将它们分派到两个主题“activated”和“notActivated”中,因为来自 notActivated 数据的时间戳每次都大于来自 activate 的时间戳。为了确保没有丢包或发生任何事情,我通过在调度程序中保存最后一个状态(与来自 CustomData 的 isActive 属性相关)来证明它。那个状态最初设置错误,所以发生的事情是:

notActivated:
(1)dataA //--> this entry was not there because it was skipped refering to false state matching
(4)dataB

activated:
(2)dataB
(3)dataA

所以持续时间的计算是

duration = (4)dataB - (2)dataB 

因此,持续时间是:

duration = deltaTime + delay

再次感谢彼得和玛丽安

莫迪斯