大对象的进程间通信

问题描述

我正在尝试用python创建一个在线(实时)视频处理应用程序。输入视频序列是从输入摄像机流中实时获得的,并且需要实时处理帧,一次处理一个批次/窗口。一批帧的处理比连续帧之间的时间间隔要慢得多,因此需要在输入和输出并行度之间具有恒定延迟的情况下实现实时处理。为了实现此目标,我想到了使用多重处理来创建两个过程的过程-一个过程,该过程在另一个过程(生产者)在下一个框架窗口中读取时在框架的连续窗口(即消费者)上完成其工作从相机输入(如有必要,以降低的帧频)。

为了检验这个想法,我在python中编写了以下示例,该示例尝试通过IPC Pipe()从生产者向消费者传达一帧信息(一个numpy数组),其分辨率以Size为特征。

from multiprocessing import Process,Pipe
import numpy as np

Size = 256


def producer(conn):
    print("Child entered")
    window1 = np.arange(Size * Size).reshape((Size,Size))
    print("Array created")
    conn.send(window1)
    print("Data added to Pipe")
    return


if __name__ == '__main__':
    conn1,conn2 = Pipe()
    window_reader = Process(target=producer,args=(conn2,))
    window_reader.start()
    window_reader.join()
    print("Child process exited,entered parent")
    # Acting like a consumer
    a = conn1.recv()
    print(a)

对于Size = 256,观察到的输出是-

孩子输入了
数组已创建

此后,处理冻结,并且必须强制终止程序,这表明生产者进程在尝试写入Pipe时被阻塞。 python multiprocessing library除了Queue()还提供了Pipe()方法documentation表示Queue()按照以下方式工作的相同状态-

将对象放入队列时,将其腌制,然后后台线程将腌制的数据刷新到底层管道。

但是,如果我在上述示例中将Pipe()替换为Queue(),则获得以下python代码-

from multiprocessing import Process,Queue
import numpy as np

Size = 256


def producer(mp_queue):
    print("Child entered")
    window1 = np.arange(Size * Size).reshape((Size,Size))
    print("Array created")
    mp_queue.put(window1)
    print("Data added to Pipe")
    return


if __name__ == '__main__':
    q = Queue()
    window_reader = Process(target=producer,args=(q,entered parent")
    # Acting like a consumer
    a = q.get()
    print(a)

然后(令人惊讶地)输出与以前的管道情况不同,因为Queue()Pipe()不同,能够通过put() / send()阶段,提供输出-

孩子输入了
数组已创建
数据已添加到管道

Size足够小(例如Size=64)时,程序将在生产者和消费者过程之间成功传输框架,从而按预期完成执行。 Size=256及更高版本的执行阻塞似乎是由于Linux上IPC管道大小受到系统范围限制的结果。 This answer建议对Pipes的大小没有上限,并且可以通过运行cat /proc/sys/fs/pipe-max-size获得认上限。运行此命令时,我得到的值为1MB(打印的值为1048576字节)。这似乎是有道理的,因为在框架中有np.int64个条目以及多处理库的内部簿记产生的一些开销,任何大于64的Size值都会推动它。

因此,我的问题是-

  1. 我是否可以安全地(如果有的话)通过编辑/proc/sys/fs/pipe-max-size增加系统上IPC管道的限制。
  2. 是否正在增加系统上Pipes的大小,甚至可以实现我的目标?由于1MB认限制可能存在,因此有充分的理由。
  3. 其他IPC机制是否更适合我的情况?我想从生产者向消费者发送成批的帧,每个窗口或批的估计大小在4-8 MB之间,外加开销
  4. 我之所以选择多处理(mp)而不是多线程(mt),是因为前者保证了进程之间的内核级别分离,而使用python多线程库,我们可以end up with user level threads无法利用多核。当对视频进行顺序处理时,它不是实时的,因此我正在寻找一种利用多核的方法
  5. 在此应用程序中,由于一个进程写入而另一个进程仅读取信息,因此实际上不需要共享内存。这是首选mp而不是mt的另一个原因,因为据我了解,当需要共享内存(堆在同一进程的内核线程之间共享)时,我们更喜欢后者。考虑到管道的局限性,在这方面的建议也将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)