问题描述
我正在尝试开发一个图像处理管道,将视频上传到一个 GCS 存储桶,将所有帧提取为 jpg 图像,然后将这些图像上传到另一个 GCS 存储桶。我正在使用 PubSub 推送订阅来触发云运行服务。不幸的是,该服务无法在推送订阅的 10 分钟最大请求响应超时时间内可靠地处理视频。我已经跟踪了这个问题,似乎将帧上传到 GCS 导致了瓶颈。这些视频平均包含大约 28000 帧(30FPS,长度约 15 分钟)。我认为这应该在提供的时间内是可能的。所有服务都在同一地区/地区。
有没有办法提高这些 GCS blob 上传的吞吐量?当我使用 gsutil 将视频 blob 从存储桶复制到另一个存储桶(在同一区域内)时,需要几秒钟的时间。
我尝试过增加/减少线程数、增加服务 cpu 数和增加服务内存数。我看不出任何变化。 writes over 1000/Sec 的 GCS 速率限制,但我认为我还没有接近这个限制。
我的服务复制 main.py
脚本作为 Google Cloud Run Vision Tutorial 的一部分。唯一的修改是在 video.py
中更改对我的处理例程的调用。我已经包括在帖子的底部。 video.py
运行实际处理。
Cloud Run 服务配备 1 个 cpu、512 MiB、15 分钟超时 Cloud PubSub 订阅(推送订阅) 10 分钟超时(最大值)
video.py:
import os
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor
import cv2
from google.cloud import storage
from google.oauth2 import service_account
def upload(blob : storage.blob.Blob,buf : "numpy.ndarray"):
blob.upload_from_string(buf.tobytes(),content_type="image/jpeg")
def process(data : dict):
src_client = storage.Client()
src_bucket = src_client.get_bucket(data["bucket"])
src_blob = src_bucket.get_blob(data["name"])
pathname = os.path.dirname(data["name"])
basename,ext = os.path.splitext(os.path.basename(data["name"]))
signing_creds = \
service_account.Credentials.from_service_account_file("key.json")
url = src_blob.generate_signed_url(
credentials=signing_creds,version="v4",expiration=timedelta(minutes=20),method="GET"
)
count = extract_frames(url,basename,pathname)
def extract_frames(
signed_url : str,basename : str,pathname : str,dst_bucket_name : str = "extracted-frames"
) -> int:
dst_client = storage.Client()
dst_bucket = dst_client.get_bucket(dst_bucket_name)
count = 0
vid = cv2.VideoCapture(signed_url)
with ThreadPoolExecutor() as executor:
ret,frame = vid.read()
while ret:
enc_ret,buf = cv2.imencode(".jpg",frame)
if not enc_ret:
msg = f'Bad Encoding [Frame: {count:06}]'
else:
blob_name = f"{pathname}/{basename}-{count:06}.jpg"
blob = dst_bucket.blob(blob_name)
executor.map(upload,(blob,buf))
count += 1
ret,frame = vid.read()
vid.release()
return count
main.py:
import base64
import json
import os
from flask import Flask,request
# import image
import video
app = Flask(__name__)
@app.route("/",methods=["POST"])
def index():
envelope = request.get_json()
if not envelope:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}",400
if not isinstance(envelope,dict) or "message" not in envelope:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return f"Bad Request: {msg}",400
# Decode the Pub/Sub message.
pubsub_message = envelope["message"]
if isinstance(pubsub_message,dict) and "data" in pubsub_message:
try:
data = json.loads(base64.b64decode(pubsub_message["data"]).decode())
except Exception as e:
msg = (
"Invalid Pub/Sub message: "
"data property is not valid base64 encoded JSON"
)
print(f"error: {e}")
return f"Bad Request: {msg}",400
# Validate the message is a Cloud Storage event.
if not data["name"] or not data["bucket"]:
msg = (
"Invalid Cloud Storage notification: "
"expected name and bucket properties"
)
print(f"error: {msg}")
return f"Bad Request: {msg}",400
try:
# image.blur_offensive_images(data)
video.process(data)
return ("",204)
except Exception as e:
print(f"error: {e}")
return ("",500)
return ("",500)
解决方法
您的情况下的模式,如果您有更长的时间,则可以扩展。未来的视频,首先是将视频分成较小的序列(比如 3 或 5 分钟的视频),然后将这些序列存储在 Cloud Storage 中。
然后一个新事件运行一个新服务(或相同,根据您的设计),然后您提取所有图像。如果您需要一个唯一的来源,您可以将您的文件命名为具有相同的前缀,从而能够在后续过程中重复使用它。
与并行化的想法相同,您还可以想象利用 Cloud Run 的多 CPU 容量在同一实例中并行处理视频块。