问题描述
由于一个项目,我正在生成一些合成数据,用于构建用于分析目的的基础设施。
基础设施是通过数据模拟建立起来的,用于生成合成数据。这些数据通过 UDP 发送到 node-red 服务器,然后通过 mqtt 移交给 kafka。
我只是生成一个带有时间戳的布尔值。现在我想分析这些时间戳之间的时间。数据应该在一段时间之间生成。所以这是一个例子:
Data A:
{
isActivated: false,Timestamp: "xxxxx"
}
Data B:
{
isActivated: true,Timestamp: "xxxxx+deltaTime"
}
所以B.Timestamp-A.Timestamp = deltaTime
。到目前为止一切顺利,但是当我添加 Thread.Sleep(delay)
时,延迟将被添加到从 Kafka-Consumer 计算的 deltaTime ......(我在消息中获得了正确的时间戳进行计算,而不是那些产生来自 kafka 本身。我通过将 2 天添加到 DataGenerator 中生成的时间戳来测试它)
这里是代码示例:
public class CustomData
{
public DateTime Timestamp { get; set; }
public bool isActivated { get; set; }
}
public class DataGenerator
{
private bool DataAActivated { get; set; }
private IPAddress ipAddress = IPAddress.Parse("XXX.XXX.XXX.XXX");
private UdpClient udpClient = new UdpClient();
private IPEndPoint iPEndPoint = new IPEndPoint(ipAddress,XXXXX);
public DataGenerator (bool dataAActivated)
{
DataAActivated = dataAActivated;
}
public void GenerateData(double delay,int deltaTime)
{
DateTime ts0 = DateTime.Now;
DateTime ts2 = ts0.AddMilliseconds(deltaTime);
if (DataAActivated)
{
CustomData dataA = new CustomData();
dataA.isActivated = false;
dataA.Timestamp = ts0;
CustomData dataB = new CustomData();
dataB.isActivated = true;
dataB.Timestamp = ts2;
}
else
{
CustomData dataB = new CustomData();
dataB.isActivated = false;
dataB.Timestamp = ts0;
CustomData dataA = new CustomData();
dataA.isActivated = true;
dataA.Timestamp = ts2;
}
// this is causing issues
Thread.Sleep((int)delay);
SendData(dataA);
SendData(dataB);
}
private void SendData(CustomData data)
{
udpClient.Connect(iPEndPoint);
byte[] jsonUtf8Bytes;
var options = new JsonSerializerOptions
{
WriteIndented = true
};
jsonUtf8Bytes = System.Text.Json.
JsonSerializer.SerializetoUtf8Bytes(data,options);
udpClient.Send(jsonUtf8Bytes,jsonUtf8Bytes.Length);
udpClient.Close();
}
}
---------------- within Kafka-Consumer ----------------
public double CalcDuration( CustomData dataA,CustomData dataB)
{
double duration = dataB.Timestamp.Subtract(dataA.Timestamp).TotalMilliseconds;
Console.WriteLine($"duration: {duration}");
return duration;
}
如果 deltaTime 为 100 毫秒,延迟为 500 毫秒,输出将如下所示 不带 Thread.Sleep(delay):
duration: 100ms
duration: 100ms
...
使用 Thread.Sleep(延迟):
duration: 600ms
duration: 600ms
...
有人告诉我如何解决这个问题吗?
我希望我能说清楚,但不要犹豫,让我知道编辑这篇文章。
非常感谢,
问候
解决方法
我发现了这个问题。正如彼得和玛丽安提到的,我在问题中发布的代码没有导致失败。
这让我开始了解其他服务的代码。通过清理整个kafka-cluster相关topic,重新开始生产,看到Producer/Consumer链的第二个服务我设置的初始状态错误。
所以第一条消息被跳过,因此时间戳之间的计算是延迟之前和之后的。那是因为我为持续时间的计算设置的约束来自相同的偏移量。
所以要说清楚: 发送到 kafka 的数据到达主题“摄取” 会有这样的消息:
ingestion:
(1)dataA
(2)dataB
(3)dataA
(4)dataB
...
从那以后,我将它们分派到两个主题“activated”和“notActivated”中,因为来自 notActivated 数据的时间戳每次都大于来自 activate 的时间戳。为了确保没有丢包或发生任何事情,我通过在调度程序中保存最后一个状态(与来自 CustomData 的 isActive 属性相关)来证明它。那个状态最初设置错误,所以发生的事情是:
notActivated:
(1)dataA //--> this entry was not there because it was skipped refering to false state matching
(4)dataB
activated:
(2)dataB
(3)dataA
所以持续时间的计算是
duration = (4)dataB - (2)dataB
因此,持续时间是:
duration = deltaTime + delay
再次感谢彼得和玛丽安
莫迪斯