问题描述
我有来自两个不同传感器输入的实时时间序列数据。我想根据时间戳安排数据。
问题是传感器输入不一致,例如
time data1_1 data1_2 data1_3 data1_4 data2_1 data2_2 data2_3 data2_4
['0','0.1234','0.3251','1.4235','0.4234','0.6543','0.3231','1.1235','0.3434']
['1','0.1432','0.3452','1.4245','0.1434','0.1544','0.3345','1.5425','0.6534']
['2','0.1654','0.3243','1.5445','0.5432','0.4551','1.8755','0.4245']
如何(在我接收数据的同时)加入它们,以便合并具有相同时间戳的传感器数据,如下所示:
process = subprocess.Popen(['node','server.js'],stdout=subprocess.PIPE)
while True:
Byte_data = process.stdout.readline()
string_data = byte_data.rstrip().decode("utf-8") # From byte to string
list_data = string_data.split(",") # Comma separate strings
# Sort data here...
我通过子流程(一次一行)接收数据:
{{1}}
解决方法
您需要回头查看并匹配时间戳记,诸如此类。
- 从您提出的问题来看,时间戳似乎是匹配的。如果不是,而是只是命令的话,事情就会变得更加复杂。
- 如果缺少某些帧,它们将无限期地停留在回溯命令中。如果该设备运行了很长时间,则可能需要定期清除旧框架。
def time(row):
return row[1]
def synchronized(rows):
lookback = {'sensor_1': {},'sensor_2': {}}
for row in rows:
# add row to lookback
sensor = row[0]
lookback[sensor][time(row)] = row
# check for matching timestamps
rms = []
for t,r in sorted(lookback['sensor_1'].items(),key=time):
if t in lookback['sensor_2']:
yield r,lookback['sensor_2']
rms.append(t)
for rm in rms:
for tmp in lookback.values():
del tmp[rm]
def parse(rows):
for row in rows:
yield tuple(row.decode().rstrip().split(','))
import subprocess
process = subprocess.Popen(['node','server.js'],stdout=subprocess.PIPE)
for row1,row2 in synchronized(parse(process.stdout)):
print(row1,row2)
,
看到时间戳都是整数,您可以将它们用作排序列表的索引,并在等于时间戳的索引处添加或添加数据。
sorted_data = []
process = subprocess.Popen(['node',stdout=subprocess.PIPE)
while True:
Byte_data = process.stdout.readline()
string_data = byte_data.rstrip().decode("utf-8") # From byte to string
list_data = string_data.split(",") # Comma separate strings
try:
sorted_data[int(list_data[1])] += list_data[2:]
except IndexError:
sorted_data.append(list_data[1:])
# prints sorted data in real time
print(str(sorted_data).replace("],","],\n"))
# prints out final list with sorted data
print(str(sorted_data).replace("],\n"))