问题描述
我正在使用 pafy 流式传输一组 YouTube 视频,目的是将它们组合起来(分屏样式)并显示为一个视频。它正在工作,但在超过两个视频时帧速率非常慢,因为从每个流中获取一帧,当我尝试 9 个视频(对于 3x3 针脚)时,获取帧需要 0.1725 秒(太慢)。
我尝试使用管道和多重处理,但出现 EOFError:输入不足
请参阅下面的代码注释掉/在 frames =
行中在有效但缓慢的方法和我尝试进行多重处理之间进行更改
import multiprocessing
import cv2
import numpy as np
import pafy
import typing
import timeit
urls = [
"https://www.youtube.com/watch?v=tT0ob3cHPmE","https://www.youtube.com/watch?v=XmjKODQYYfg","https://www.youtube.com/watch?v=E2zrqzvtWio","https://www.youtube.com/watch?v=6cQLNXELdtw","https://www.youtube.com/watch?v=s_rmsH0wQ3g","https://www.youtube.com/watch?v=QfhpNe6pOqU","https://www.youtube.com/watch?v=C_9x0P0ebNc","https://www.youtube.com/watch?v=Ger6gU_9v9A","https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920,1080
def main():
streams = [pafy.new(url).getbest() for url in urls]
videos = [cv2.VideoCapture() for streams in streams]
[video.open(best.url) for video,best in zip(videos,streams)]
cv2.namedWindow('Video',cv2.WINDOW_FREERATIO)
cv2.setwindowProperty('Video',cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
while True:
start_time = timeit.default_timer()
# frames = [cv2.resize(video.read()[-1],(dim[0] // width,dim[1] // width)) for video in videos]
frames = get_frames(videos)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
dst = merge_frames(frames)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
cv2.imshow('Video',dst)
if cv2.waitKey(1) & 0xFF == ord('e'):
break
print(timeit.default_timer() - start_time)
continue
[video.release() for video in videos]
cv2.destroyAllWindows()
def get_frames(videos):
# frames = [video.read()[-1] for video in videos]
jobs = []
pipe_list = []
for video in videos:
recv_end,send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=get_frame,args=(video,send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
frames = [x.recv() for x in pipe_list]
return frames
def get_frame(video,send_end):
send_end.send(video.read()[1])
# send_end.send(cv2.resize(video.read()[1],dim[1] // width)))
def merge_frames(frames: typing.List[np.ndarray]):
width = np.math.ceil(np.sqrt(len(frames)))
rows = []
for row in range(width):
i1,i2 = width * row,width * row + width
rows.append(np.hstack(frames[i1: i2]))
return np.vstack(rows)
if __name__ == '__main__':
main()
解决方法
有趣的应用程序!关于错误,我运行你的代码,消息是它不能腌制 VideoCapture 对象,请参阅下面的链接,这可能是接收管道为空的原因。两个线程有两个错误:第一个是pickle,然后是EOF。
编辑 #2:我设法用每个视频等一个进程运行它:
关于性能,我首先没有合并图像(我必须修复一些细节)以查看它是否收到,并且对于 3 和 4 帧,显示在与接收线程分开的窗口中,它播放速度非常快,比实时(用 3-4 个流测试)。我认为显示的合并和调整大小很慢,4 个流 (4x1280x720) 的图片为 2560x1440。就我而言,它已调整大小以适合屏幕。
感谢分享那个问题和那个图书馆等!
(顺便说一句,我也尝试使用锁,最初但碰巧没有必要。代码需要清理一些实验。此外,当前的实现可能不会为每个流的每帧同步,因为它没有't join per frame 作为您的原始示例,它创建了新进程以从每个帧中抓取一帧然后合并它们。)
CPU负载主要在主进程 (4 核 CPU,因此每个实例最大值 = 25%):
有时:
0.06684677699999853 0.030737616999999773 1.2829999995744856e-06 镜头(帧)= 9 0.06703700200000284 0.030708104000002123 6.409999997458726e-07 LEN(帧数)= 9
主循环中的waitKey可以调整。
代码
https://github.com/Twenkid/Twenkid-FX-Studio/blob/master/Py/YoutubeAggregatorPafy/y6.py
# Merging Youtube streams with pafy,opencv and multithreading
# Base code by Fraser Langton - Thanks!
# Refactored and debugged by Twenkid
# version y6 - more cleaning of unused code,properly close VideoCapture in the processes
import multiprocessing #Process,Lock
from multiprocessing import Lock # Not needed
import cv2
import numpy as np
import pafy
import typing
import timeit
import time
urls = [
"https://www.youtube.com/watch?v=tT0ob3cHPmE","https://www.youtube.com/watch?v=XmjKODQYYfg","https://www.youtube.com/watch?v=E2zrqzvtWio","https://www.youtube.com/watch?v=6cQLNXELdtw","https://www.youtube.com/watch?v=s_rmsH0wQ3g","https://www.youtube.com/watch?v=QfhpNe6pOqU","https://www.youtube.com/watch?v=C_9x0P0ebNc","https://www.youtube.com/watch?v=Ger6gU_9v9A","https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
# Merging seems to require equal number of sides,so 2x2,3x3 etc. The resolutions should be the same.
'''
[
"https://www.youtube.com/watch?v=C_9x0P0ebNc","https://www.youtube.com/watch?v=39dZ5WhDlLE",]
'''
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920,1080
streams = []
#bestStreams = []
def main():
global bestStreams
streams = [pafy.new(url).getbest() for url in urls]
print(streams)
#[bestStreams for best in streams]
#print(bestStreams)
cv2.waitKey(0)
videos = [cv2.VideoCapture() for streams in streams]
bestURLS = []
#[video.open(best.url) for video,best in zip(videos,streams)] # Opened per process
[bestURLS.append(best.url) for best in streams]
#[ for video,streams)]
print(bestURLS)
cv2.waitKey(0)
cv2.namedWindow('Video',cv2.WINDOW_FREERATIO)
cv2.setWindowProperty('Video',cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
LOCK = Lock()
#proc = get_framesUL(bestStreams,LOCK)
#proc,pipes = get_framesULJ(bestStreams,LOCK)
proc,pipes = get_framesULJ(bestURLS,LOCK)
print("PROC,PIPES",proc,pipes)
#cv2.waitKey(0)
frames = []
numStreams = len(streams)
while True:
start_time = timeit.default_timer()
# frames = [cv2.resize(video.read()[-1],(dim[0] // width,dim[1] // width)) for video in videos]
#frames = get_frames(videos,LOCK)
#frames = get_framesUL(streams,LOCK)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
frames = [x.recv() for x in pipes]
lf = len(frames)
print("LEN(FRAMES)=",lf);
#if lf<3: time.sleep(3); print("LEN(FRAMES)=",lf); #continue #Else merge and show
#proc.join()
#elif lf==3: frames = [x.recv() for x in pipes]
dst = merge_frames(frames)
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
#if cv2!=None:
try:
cv2.imshow('Video',dst)
except: print("Skip")
#cv2.waitKey(1)
if cv2.waitKey(20) & 0xFF == ord('e'):
break
print(timeit.default_timer() - start_time)
continue
for proc in jobs:
proc.join()
# [video.release() for video in videos] # Per process
cv2.destroyAllWindows()
def get_framesULJ(videosURL,L): #return the processes,join in main and read the frames there
# frames = [video.read()[-1] for video in videos]
print("get_framesULJ:",videosURL)
jobs = []
pipe_list = []
#print("VIDEOS:",videosURL)
#for video in videos:
for videoURL in videosURL: #urls:
recv_end,send_end = multiprocessing.Pipe(False)
print(recv_end,send_end)
p = multiprocessing.Process(target=get_frame2L,args=(videoURL,send_end,L))
#p = multiprocessing.Process(target=get_frame,args=(video,L))
#if (p==None): continue
print("P = ",p)
#time.sleep(0.001)
jobs.append(p)
print("JOBS,len",jobs,len(jobs))
pipe_list.append(recv_end)
print("pipe_list",pipe_list)
p.start()
#cv2.waitKey(0)
#for proc in jobs:
# proc.join()
#frames = [x.recv() for x in pipe_list]
#return frames
#cv2.waitKey(0)
return jobs,pipe_list
def get_frame2L(videoURL,L):
v = cv2.VideoCapture()
#[video.open(best.url)
#L.acquire()
v.open(videoURL)
print("get_frame2",videoURL,v,send_end)
#cv2.waitKey(0)
while True:
ret,frame = v.read()
if ret: send_end.send(frame); #cv2.imshow("FRAME",frame); cv2.waitKey(1)
else: print("NOT READ!"); break
#send_end.send(v.read()[1])
#L.release()
def get_framesUL(videosURL,L):
# frames = [video.read()[-1] for video in videos]
jobs = []
pipe_list = []
print("VIDEOS:",pipe_list)
p.start()
for proc in jobs:
proc.join()
frames = [x.recv() for x in pipe_list]
return frames
def get_frames(videos,videos)
for video in videos:
recv_end,send_end)
p = multiprocessing.Process(target=get_frame,pipe_list)
p.start()
for proc in jobs:
proc.join()
frames = [x.recv() for x in pipe_list]
return frames
def get_frame(video,L):
L.acquire()
print("get_frame",video,send_end)
send_end.send(video.read()[1])
L.release()
# send_end.send(cv2.resize(video.read()[1],dim[1] // width)))
def get_frame2(videoURL,send_end):
v = video.open(videoURL)
while True:
ret,frame = v.read()
if ret: send_end.send(frame)
else: break
def merge_frames(frames: typing.List[np.ndarray]):
#cv2.imshow("FRAME0",frames[0]) ########## not images/small
#cv2.imshow("FRAME1",frames[1]) ##########
#cv2.imshow("FRAME2",frames[2]) ##########
#cv2.imshow("FRAME3",frames[3]) ##########
#cv2.waitKey(1)
width = np.math.ceil(np.sqrt(len(frames)))
rows = []
for row in range(width):
i1,i2 = width * row,width * row + width
rows.append(np.hstack(frames[i1: i2]))
return np.vstack(rows)
if __name__ == '__main__':
main()
编辑 #1 想法:为每个视频流创建一个进程并在循环中读取它(在管道中泵入),而不是为每一帧创建一个新进程,和/从而打开视频/通过管道带有 videoURL 的 VideoCapture 对象,而不是发送 VideoCapture 对象。 (不知道这个表格有没有同样的pickle问题)
...
in main:
bestURLS = []
proc,LOCK)
[bestURLS.append(best.url) for best in streams]
def get_frame2(videoURL,frame = v.read()
if ret: send_end.send(video)
else: break
def get_framesULJ(videosURL,join in main and read the frames there
print("get_framesULJ:",videosURL)
jobs = []
pipe_list = []
for videoURL in videosURL:
recv_end,send_end = multiprocessing.Pipe(False)
print(recv_end,send_end)
p = multiprocessing.Process(target=get_frame2L,L))
print("P = ",p)
jobs.append(p)
print("JOBS,len(jobs))
pipe_list.append(recv_end)
print("pipe_list",pipe_list)
p.start()
return jobs,pipe_list
原答案:
<multiprocessing.connection.PipeConnection object at 0x000000000D3C7D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3BD2E0>
Traceback (most recent call last):
File "y.py",line 104,in <module>
main()
File "y.py",line 48,in main
frames = get_frames(videos)
File "y.py",line 80,in get_frames
p.start()
File "C:\Program Files\Python38\lib\multiprocessing\process.py",line 121,in
start
self._popen = self._Popen(self)
File "C:\Program Files\Python38\lib\multiprocessing\context.py",line 224,in
_Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Program Files\Python38\lib\multiprocessing\context.py",line 326,in
_Popen
return Popen(process_obj)
File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py",lin
e 93,in __init__
reduction.dump(process_obj,to_child)
File "C:\Program Files\Python38\lib\multiprocessing\reduction.py",line 60,in
dump
ForkingPickler(file,protocol).dump(obj)
TypeError: cannot pickle 'cv2.VideoCapture' object
Z:\>Traceback (most recent call last):
File "<string>",line 1,in <module>
File "C:\Program Files\Python38\lib\multiprocessing\spawn.py",line 116,in sp
awn_main
exitcode = _main(fd,parent_sentinel)
File "C:\Program Files\Python38\lib\multiprocessing\spawn.py",line 126,in _m
ain
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
它在 p.start 之前失败。 实例已创建并且结构看起来不错:
VIDEOS: [<VideoCapture 000000000D418710>,<VideoCapture 000000000D4186F0>,<Vide
oCapture 000000000D418B70>]
<multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3B62E0>
P = <Process name='Process-1' parent=8892 initial>
JOBS,len [<Process name='Process-1' parent=8892 initial>] 1
RECV_END <multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90
>
查看模块pickle:
https://docs.python.org/3/library/pickle.html
似乎不是所有东西都可以“腌制”。
什么可以腌制和不腌制?
可以腌制以下类型:
None,True,and False
integers,floating point numbers,complex numbers
strings,bytes,bytearrays
tuples,lists,sets,and dictionaries containing only picklable objects
functions defined at the top level of a module (using def,not lambda)
built-in functions defined at the top level of a module
classes that are defined at the top level of a module
instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details).
此外,opencv 中似乎有一个错误导致了这一点。给出的解决方案之一是关闭多处理...
Python multiprocess can't pickle opencv videocapture object
https://github.com/MVIG-SJTU/AlphaPose/issues/164
方浩舒于2018年10月17日发表评论
这个bug是由于opencv中的多处理造成的。 --sp 禁用 多处理。顺便说一句,你能告诉我你的opencv版本吗? 正在使用?
我猜是关于锁定内存什么的。
我尝试的一种解决方法是首先将对象的像素转储为纯数据或原始数据,可能带有关于大小等的标题。
此外,总的来说,为了更流畅的播放,我认为需要添加一些缓冲。
顺便说一句,你的 openCV 是什么版本?我的是4.2.0