问题描述
我正在尝试将云功能与运行在处理传入数据的Compute Engine上的AI程序连接起来。当前,Cloud Function是由Cloud Storage存储桶中的更改触发的,并且每次上传数据时都会启动VM,但是当同时上传大量数据时,实例将重新启动几次,从而中断数据处理。如何将传入数据排队,并让Compute Engine实例在以前的过程完成时读取并处理该传入数据?我看过发布/订阅和云任务,但是这些实现提到扩展多个实例或连接到App Engine。是否需要修改分析数据的过程以按顺序处理数据,或者可以使用Google Cloud工具来完成?
编辑
附加的是在GCS存储桶中上传文件时运行的云功能。请注意,实例触发的方式总是相同的,我的理论是后续的触发方式会重写元数据,并立即重新启动实例或更改该实例的指令,从而中断对先前文件的处理。
import os
from googleapiclient.discovery import build
def start(event,context):
file = event
print(file["id"])
string = file["id"]
newstring = string.split('/')
userId = newstring[1]
paymentId = newstring[2]
name = newstring[3]
print(name)
if name == "uploadcomplete.txt":
startup_script = """#! /bin/bash
cd ~ && pwd 1>>/var/log/log.out 2>&1
PATH=$PATH://usr/local/cuda 1>>/var/log/log.out 2>&1
cd program_directory 1>>/var/log/log.out 2>&1
source /opt/anaconda3/etc/profile.d/conda.sh 1>/var/log/log.out 2>&1
conda activate env
cd keras-retinanet/ 1>>/var/log/log.out 2>&1
export PYTHONPATH=`pwd` 1>>/var/log/log.out 2>&1
cd tracker 1>>/var/log/log.out 2>&1
python program_name --gcs_input_path gs://input/{userId}/{paymentId} --gcs_output_path gs://output/{userId}/{paymentId} 1>>/var/log/log.out 2>&1
sudo python3 gcs_to_mongo.py {userId} {paymentId} 1>>/var/log/log.out 2>&1
sudo shutdown -P now
""".format(userId=userId,paymentId=paymentId)
service = build('compute','v1',cache_discovery=False)
print('VM Instance starting')
project = 'XXXX'
zone = 'us-east1-c'
instance = 'YYYY'
metadata = service.instances().get(project=project,zone=zone,instance=instance)
metares = metadata.execute()
print(metares)
fingerprint = metares["metadata"]["fingerprint"]
print(fingerprint)
bodydata = {"fingerprint": fingerprint,"items": [{"key": "startup-script","value": startup_script}]}
print(bodydata)
meta = service.instances().setMetadata(project=project,instance=instance,body=bodydata)
res = meta.execute()
instanceget = service.instances().get(project=project,instance=instance).execute()
request = service.instances().start(project=project,instance=instance)
response = request.execute()
print('VM Instance started')
print(instanceget)
print("'New Metadata:",instanceget['metadata'])
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)