如何调度实时循环任务?

问题描述

我们是一个由本科生组成的团队,目前正在致力于构建一个有腿的机器人。目前,我们与机器人的界面是使用我们正在使用的主板上的 sdk 用 Python 编写的。

为了与主板sdk通信,我们需要每毫秒发送一个命令。

为了让我们能够定期发送任务,我们在我们的 linux 内核中应用了 rt-preempt 补丁。 (Ubuntu LTS 20.04,内核 5.10.27-rt36)

我们对编写实时应用程序非常陌生,并且遇到了一些问题,我们的任务有时会比指定的时间步长小得多。在下图中,我们绘制了将命令发送到 sdk 的 while 循环的每个循环的时间。 (x 轴是以秒为单位的时间,y 轴是迭代的经过时间,也是以秒为单位) 从图中可以看出,一步比其余步小得多。每次运行脚本时,这似乎都发生在相同的确切时间标记上。

cyclic_task_plot

我们使用以下方法设置整个脚本的优先级:

pid = os.getpid()
sched = os.SCHED_FIFO
param = os.sched_param(98)
os.sched_setscheduler(pid,sched,param)

我们的循环任务如下所示:

dt 设置为 0.001

while(_running):
    if direction:
        q = q + 0.0025
        if (q > np.pi/2).any():
            direction = False
    else:
        q = q - 0.0025
        if (q < -np.pi/2).any():
            direction = True

    master_board.track_reference(q,q_prime)


    #Terminate if duration has passed
    if (time.perf_counter() - program_start > duration):
        _running = False

    cycle_time = time.perf_counter() - cycle_start
    time.sleep(dt - cycle_time)
    cycle_start = time.perf_counter()

    timestep_end = time.perf_counter()
    time_per_timestep_array.append(timestep_end - timestep_start)
    timestep_start = time.perf_counter()

我们怀疑这个问题与我们定义睡眠量的方式有关。 Cycle_time 是上面 time.sleep() 计算所用的时间,因此:睡眠时间 + 循环时间 = 1ms。但是,我们不确定如何正确执行此操作,并且正在努力寻找有关该主题的资源。

应该如何为实时应用正确定义这样的任务?

我们有相当宽松的要求(几毫秒),但它是确定性的对我们来说非常重要,因为这是我们论文的一部分,我们需要了解发生了什么。

非常感谢对我们的问题或相关资源的任何回答。

完整代码链接https://drive.google.com/drive/folders/12KE0EBaLc2rkTZK2FuX_goMF4MgWtknS?usp=sharing

解决方法

    timestep_end = time.perf_counter()
    time_per_timestep_array.append(timestep_end - timestep_start)
    timestep_start = time.perf_counter()

您正在记录上一周期的 timestep_start 和当前周期的 timestep_end 之间的时间。这个间隔不能准确代表周期时间步长(即使我们假设没有发生任务抢占);它不包括数组追加函数消耗的时间。由于每次运行脚本时异常值似乎都发生在相同的确切时间标记,我们可以怀疑此时数组超过了特定大小,必须进行昂贵的内存重新分配。不管真正的原因是什么,您都应该通过记录周期开始之间的时间来消除这种计时误差:

    timestep_end = cycle_start
    time_per_timestep_array.append(timestep_end - timestep_start)
    timestep_start = cycle_start