Google Compute Engine,将来自云端功能的传入数据排队

问题描述

我正在尝试将云功能与运行在处理传入数据的Compute Engine上的AI程序连接起来。当前,Cloud Function是由Cloud Storage存储桶中的更改触发的,并且每次上传数据时都会启动VM,但是当同时上传大量数据时,实例将重新启动几次,从而中断数据处理。如何将传入数据排队,并让Compute Engine实例在以前的过程完成时读取并处理该传入数据?我看过发布/订阅和云任务,但是这些实现提到扩展多个实例或连接到App Engine。是否需要修改分析数据的过程以按顺序处理数据,或者可以使用Google Cloud工具来完成?

编辑

附加的是在GCS存储桶中上传文件时运行的云功能。请注意,实例触发的方式总是相同的,我的理论是后续的触发方式会重写元数据,并立即重新启动实例或更改该实例的指令,从而中断对先前文件的处理。

import os
from googleapiclient.discovery import build


def start(event,context):
    file = event
    print(file["id"])

    string = file["id"]

    newstring = string.split('/')
    userId = newstring[1]
    paymentId = newstring[2]
    name = newstring[3]

    print(name)

    if name == "uploadcomplete.txt":
        startup_script = """#! /bin/bash
                cd ~ && pwd 1>>/var/log/log.out 2>&1
                PATH=$PATH://usr/local/cuda 1>>/var/log/log.out 2>&1
                cd program_directory 1>>/var/log/log.out 2>&1
                source /opt/anaconda3/etc/profile.d/conda.sh 1>/var/log/log.out 2>&1
                conda activate env
                cd keras-retinanet/ 1>>/var/log/log.out 2>&1
                export PYTHONPATH=`pwd` 1>>/var/log/log.out 2>&1
                cd tracker 1>>/var/log/log.out 2>&1
                python program_name --gcs_input_path gs://input/{userId}/{paymentId} --gcs_output_path gs://output/{userId}/{paymentId} 1>>/var/log/log.out 2>&1
                sudo python3 gcs_to_mongo.py {userId} {paymentId} 1>>/var/log/log.out 2>&1
                sudo shutdown -P now
                """.format(userId=userId,paymentId=paymentId)
                
        service = build('compute','v1',cache_discovery=False)
        print('VM Instance starting')
        project = 'XXXX'
        zone = 'us-east1-c'
        instance = 'YYYY'
        metadata = service.instances().get(project=project,zone=zone,instance=instance)
        metares = metadata.execute()
        print(metares)
    
        fingerprint = metares["metadata"]["fingerprint"]
        print(fingerprint)
        bodydata = {"fingerprint": fingerprint,"items": [{"key": "startup-script","value": startup_script}]}

        print(bodydata)        
        meta = service.instances().setMetadata(project=project,instance=instance,body=bodydata)
        res = meta.execute()
        instanceget = service.instances().get(project=project,instance=instance).execute()
        request = service.instances().start(project=project,instance=instance)
        response = request.execute()
        print('VM Instance started')
        print(instanceget)
        print("'New Metadata:",instanceget['metadata'])

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...