使用多处理读取多个视频流?

问题描述

我正在使用 pafy 流式传输一组 YouTube 视频,目的是将它们组合起来(分屏样式)并显示一个视频。它正在工作,但在超过两个视频时帧速率非常慢,因为从每个流中获取一帧,当我尝试 9 个视频(对于 3x3 针脚)时,获取帧需要 0.1725 秒(太慢)。

我认为减少这种情况的最佳方法是以并行/多进程方式获取流。

我尝试使用管道和多重处理,但出现 EOFError:输入不足

请参阅下面的代码注释掉/在 frames = 行中在有效但缓慢的方法和我尝试进行多重处理之间进行更改

import multiprocessing
import cv2
import numpy as np
import pafy
import typing
import timeit

urls = [
    "https://www.youtube.com/watch?v=tT0ob3cHPmE","https://www.youtube.com/watch?v=XmjKODQYYfg","https://www.youtube.com/watch?v=E2zrqzvtWio","https://www.youtube.com/watch?v=6cQLNXELdtw","https://www.youtube.com/watch?v=s_rmsH0wQ3g","https://www.youtube.com/watch?v=QfhpNe6pOqU","https://www.youtube.com/watch?v=C_9x0P0ebNc","https://www.youtube.com/watch?v=Ger6gU_9v9A","https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920,1080


def main():
    streams = [pafy.new(url).getbest() for url in urls]

    videos = [cv2.VideoCapture() for streams in streams]

    [video.open(best.url) for video,best in zip(videos,streams)]

    cv2.namedWindow('Video',cv2.WINDOW_FREERATIO)
    cv2.setwindowProperty('Video',cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)

    while True:
        start_time = timeit.default_timer()
        # frames = [cv2.resize(video.read()[-1],(dim[0] // width,dim[1] // width)) for video in videos]
        frames = get_frames(videos)
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        dst = merge_frames(frames)
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        cv2.imshow('Video',dst)

        if cv2.waitKey(1) & 0xFF == ord('e'):
            break
        print(timeit.default_timer() - start_time)

        continue

    [video.release() for video in videos]
    cv2.destroyAllWindows()


def get_frames(videos):
    # frames = [video.read()[-1] for video in videos]

    jobs = []
    pipe_list = []
    for video in videos:
        recv_end,send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=get_frame,args=(video,send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames


def get_frame(video,send_end):
    send_end.send(video.read()[1])
    # send_end.send(cv2.resize(video.read()[1],dim[1] // width)))


def merge_frames(frames: typing.List[np.ndarray]):
    width = np.math.ceil(np.sqrt(len(frames)))
    rows = []
    for row in range(width):
        i1,i2 = width * row,width * row + width
        rows.append(np.hstack(frames[i1: i2]))
    return np.vstack(rows)


if __name__ == '__main__':
    main()

解决方法

有趣的应用程序!关于错误,我运行你的代码,消息是它不能腌制 VideoCapture 对象,请参阅下面的链接,这可能是接收管道为空的原因。两个线程有​​两个错误:第一个是pickle,然后是EOF。

编辑 #2:我设法用每个视频等一个进程运行它:

enter image description here

enter image description here

关于性能,我首先没有合并图像(我必须修复一些细节)以查看它是否收到,并且对于 3 和 4 帧,显示在与接收线程分开的窗口中,它播放速度非常快,比实时(用 3-4 个流测试)。我认为显示的合并和调整大小很慢,4 个流 (4x1280x720) 的图片为 2560x1440。就我而言,它已调整大小以适合屏幕。

感谢分享那个问题和那个图书馆等!

(顺便说一句,我也尝试使用锁,最初但碰巧没有必要。代码需要清理一些实验。此外,当前的实现可能不会为每个流的每帧同步,因为它没有't join per frame 作为您的原始示例,它创建了新进程以从每个帧中抓取一帧然后合并它们。)

CPU负载主要在主进程 (4 核 CPU,因此每个实例最大值 = 25%):

enter image description here

有时:

0.06684677699999853 0.030737616999999773 1.2829999995744856e-06 镜头(帧)= 9 0.06703700200000284 0.030708104000002123 6.409999997458726e-07 LEN(帧数)= 9

主循环中的waitKey可以调整。

代码

https://github.com/Twenkid/Twenkid-FX-Studio/blob/master/Py/YoutubeAggregatorPafy/y6.py

# Merging Youtube streams with pafy,opencv and multithreading
# Base code by Fraser Langton - Thanks!
# Refactored and debugged by Twenkid
# version y6 - more cleaning of unused code,properly close VideoCapture in the processes

import multiprocessing #Process,Lock
from multiprocessing import Lock # Not needed
import cv2
import numpy as np
import pafy
import typing
import timeit
import time

urls = [
    "https://www.youtube.com/watch?v=tT0ob3cHPmE","https://www.youtube.com/watch?v=XmjKODQYYfg","https://www.youtube.com/watch?v=E2zrqzvtWio","https://www.youtube.com/watch?v=6cQLNXELdtw","https://www.youtube.com/watch?v=s_rmsH0wQ3g","https://www.youtube.com/watch?v=QfhpNe6pOqU","https://www.youtube.com/watch?v=C_9x0P0ebNc","https://www.youtube.com/watch?v=Ger6gU_9v9A","https://www.youtube.com/watch?v=39dZ5WhDlLE"
]

# Merging seems to require equal number of sides,so 2x2,3x3 etc. The  resolutions should be the same.
'''
[    
    "https://www.youtube.com/watch?v=C_9x0P0ebNc","https://www.youtube.com/watch?v=39dZ5WhDlLE",]
'''

width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920,1080

streams = []
#bestStreams = []

def main():
    global bestStreams
    streams = [pafy.new(url).getbest() for url in urls]
    print(streams)
    #[bestStreams for best in streams]
    #print(bestStreams)
    cv2.waitKey(0)
    videos = [cv2.VideoCapture() for streams in streams]
    bestURLS = [] 
    #[video.open(best.url) for video,best in zip(videos,streams)]  # Opened per process
    [bestURLS.append(best.url) for best in streams]
    
    #[ for video,streams)]
    print(bestURLS)
    cv2.waitKey(0)
    cv2.namedWindow('Video',cv2.WINDOW_FREERATIO)
    cv2.setWindowProperty('Video',cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
    LOCK = Lock()
    #proc = get_framesUL(bestStreams,LOCK)
    #proc,pipes = get_framesULJ(bestStreams,LOCK)
    proc,pipes = get_framesULJ(bestURLS,LOCK)     
    print("PROC,PIPES",proc,pipes)
    #cv2.waitKey(0)
    frames = []
    numStreams = len(streams)
    while True:
        start_time = timeit.default_timer()
        # frames = [cv2.resize(video.read()[-1],(dim[0] // width,dim[1] // width)) for video in videos]
        #frames = get_frames(videos,LOCK)
        #frames = get_framesUL(streams,LOCK)
        
        
        print(timeit.default_timer() - start_time)

        start_time = timeit.default_timer()
        
        frames = [x.recv() for x in pipes]
        lf = len(frames)
        print("LEN(FRAMES)=",lf);
        #if lf<3: time.sleep(3); print("LEN(FRAMES)=",lf); #continue #Else merge and show
        #proc.join()
        #elif lf==3: frames = [x.recv() for x in pipes]
                
        dst = merge_frames(frames)
        print(timeit.default_timer() - start_time)
         
        start_time = timeit.default_timer()      
        #if cv2!=None:
        try:
          cv2.imshow('Video',dst)
        except: print("Skip")
        #cv2.waitKey(1)  

        if cv2.waitKey(20) & 0xFF == ord('e'):
            break
        print(timeit.default_timer() - start_time)

        continue
        
    for proc in jobs:
        proc.join()
        
    # [video.release() for video in videos] # Per process
    cv2.destroyAllWindows()



def get_framesULJ(videosURL,L): #return the processes,join in main and read the frames there
    # frames = [video.read()[-1] for video in videos]
    print("get_framesULJ:",videosURL)    
    jobs = []
    pipe_list = []
    #print("VIDEOS:",videosURL)    
    #for video in videos:
    for videoURL in videosURL: #urls:
        recv_end,send_end = multiprocessing.Pipe(False)
        print(recv_end,send_end)
        p = multiprocessing.Process(target=get_frame2L,args=(videoURL,send_end,L))
        #p = multiprocessing.Process(target=get_frame,args=(video,L))
        #if (p==None): continue
        print("P = ",p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS,len",jobs,len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list",pipe_list)               
        p.start()
        #cv2.waitKey(0)

    #for proc in jobs:
    #    proc.join()

    #frames = [x.recv() for x in pipe_list]
    #return frames
    #cv2.waitKey(0)
    return jobs,pipe_list

def get_frame2L(videoURL,L):
    v = cv2.VideoCapture()
    #[video.open(best.url)
    #L.acquire()
    v.open(videoURL)
    print("get_frame2",videoURL,v,send_end)
    #cv2.waitKey(0)
    while True:      
      ret,frame = v.read()
      if ret: send_end.send(frame); #cv2.imshow("FRAME",frame); cv2.waitKey(1)   
      else: print("NOT READ!"); break
    #send_end.send(v.read()[1])
    #L.release()
    
def get_framesUL(videosURL,L):
    # frames = [video.read()[-1] for video in videos]

    jobs = []
    pipe_list = []
    print("VIDEOS:",pipe_list)               
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames


def get_frames(videos,videos)    
    for video in videos:
        recv_end,send_end)
        p = multiprocessing.Process(target=get_frame,pipe_list)               
        p.start()

    for proc in jobs:
        proc.join()

    frames = [x.recv() for x in pipe_list]
    return frames
    
def get_frame(video,L):
    L.acquire()
    print("get_frame",video,send_end)
    send_end.send(video.read()[1])
    L.release()
    # send_end.send(cv2.resize(video.read()[1],dim[1] // width)))

    
def get_frame2(videoURL,send_end):
    v = video.open(videoURL)       
    while True:
      ret,frame = v.read()
      if ret: send_end.send(frame)
      else: break
      
    
def merge_frames(frames: typing.List[np.ndarray]):
    #cv2.imshow("FRAME0",frames[0]) ########## not images/small
    #cv2.imshow("FRAME1",frames[1]) ##########
    #cv2.imshow("FRAME2",frames[2]) ##########
    #cv2.imshow("FRAME3",frames[3]) ##########
    #cv2.waitKey(1)
    width = np.math.ceil(np.sqrt(len(frames)))
    rows = []
    for row in range(width):
        i1,i2 = width * row,width * row + width
        rows.append(np.hstack(frames[i1: i2]))
    
    
    return np.vstack(rows)


if __name__ == '__main__':
    main()

编辑 #1 想法:为每个视频流创建一个进程并在循环中读取它(在管道中泵入),而不是为每一帧创建一个新进程,和/从而打开视频/通过管道带有 videoURL 的 VideoCapture 对象,而不是发送 VideoCapture 对象。 (不知道这个表格有没有同样的pickle问题)

...
in main:
bestURLS = []
proc,LOCK) 
[bestURLS.append(best.url) for best in streams]



def get_frame2(videoURL,frame = v.read()
      if ret: send_end.send(video)
      else: break

 def get_framesULJ(videosURL,join in main and read the frames there
print("get_framesULJ:",videosURL)    
jobs = []
pipe_list = []
for videoURL in videosURL:
    recv_end,send_end = multiprocessing.Pipe(False)
    print(recv_end,send_end)
    p = multiprocessing.Process(target=get_frame2L,L))       
    print("P = ",p)
    jobs.append(p)
    print("JOBS,len(jobs))                
    pipe_list.append(recv_end)
    print("pipe_list",pipe_list)               
    p.start()

return jobs,pipe_list

原答案:

<multiprocessing.connection.PipeConnection object at 0x000000000D3C7D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3BD2E0>
Traceback (most recent call last):
  File "y.py",line 104,in <module>
    main()
  File "y.py",line 48,in main
    frames = get_frames(videos)
  File "y.py",line 80,in get_frames
    p.start()
  File "C:\Program Files\Python38\lib\multiprocessing\process.py",line 121,in
start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py",line 224,in
_Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py",line 326,in
_Popen
    return Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py",lin
e 93,in __init__
    reduction.dump(process_obj,to_child)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py",line 60,in
 dump
    ForkingPickler(file,protocol).dump(obj)
TypeError: cannot pickle 'cv2.VideoCapture' object

Z:\>Traceback (most recent call last):
  File "<string>",line 1,in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py",line 116,in sp
awn_main
    exitcode = _main(fd,parent_sentinel)
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py",line 126,in _m
ain
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input

它在 p.start 之前失败。 实例已创建并且结构看起来不错:

VIDEOS: [<VideoCapture 000000000D418710>,<VideoCapture 000000000D4186F0>,<Vide
oCapture 000000000D418B70>]
<multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3B62E0>
P =  <Process name='Process-1' parent=8892 initial>
JOBS,len [<Process name='Process-1' parent=8892 initial>] 1
RECV_END <multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90
>

查看模块pickle:

https://docs.python.org/3/library/pickle.html

似乎不是所有东西都可以“腌制”。

什么可以腌制和不腌制?

可以腌制以下类型:

None,True,and False

integers,floating point numbers,complex numbers

strings,bytes,bytearrays

tuples,lists,sets,and dictionaries containing only picklable objects

functions defined at the top level of a module (using def,not lambda)

built-in functions defined at the top level of a module

classes that are defined at the top level of a module

instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details).

此外,opencv 中似乎有一个错误导致了这一点。给出的解决方案之一是关闭多处理...

Python multiprocess can't pickle opencv videocapture object

https://github.com/MVIG-SJTU/AlphaPose/issues/164

方浩舒于2018年10月17日发表评论

这个bug是由于opencv中的多处理造成的。 --sp 禁用 多处理。顺便说一句,你能告诉我你的opencv版本吗? 正在使用?

我猜是关于锁定内存什么的。

我尝试的一种解决方法是首先将对象的像素转储为纯数据或原始数据,可能带有关于大小等的标题。

此外,总的来说,为了更流畅的播放,我认为需要添加一些缓冲。

顺便说一句,你的 openCV 是什么版本?我的是4.2.0

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...