如何最好地处理来自谷歌长期运行的云功能操作的异步响应

问题描述

我正在使用 Google Cloud Functions (python) 通过调用 exportAssets() 方法 here 从 GCP 启动资产清单导出。该方法返回一个定义了 here 的 Operations 对象,该对象可用于轮询操作,直到操作完成。当然,因为这是一个功能,我被限制在 540 秒内,所以不能永远这样做。 Google Api python 客户端提供了 add_done_callback() 方法,可以在其中等待异步响应,但据我所知,它需要我在云函数中保持线程处于活动状态。有没有办法告诉执行操作的资产清单 API 将 aync 响应(成功或失败)发送到我可以正确处理响应的 pubsub 主题?试图避免使用 basic_scaling 启动 appengine 实例以支持 24 小时超时。

    from google.cloud import asset_v1
    # .....
    # Setup request to asset inventory API
    parent = "organizations/{}".format(GCP_ORGANIZATION)
    requested_type = 'RESOURCE'
    dataset = 'projects/{}/datasets/gcp_assets_{}'.format(GCP_PROJECT,requested_type)
    partition_spec = asset_v1.PartitionSpec
    partition_key = asset_v1.PartitionSpec.PartitionKey.REQUEST_TIME
    partition_spec.partition_key = asset_v1.PartitionSpec.PartitionKey.REQUEST_TIME

    output_config = asset_v1.OutputConfig()
    output_config.bigquery_destination.dataset = dataset
    output_config.bigquery_destination.table = 'assets'
    output_config.bigquery_destination.separate_tables_per_asset_type = True
    output_config.bigquery_destination.partition_spec.partition_key = partition_key

    # Make API request to asset inventory API
    print("Creating job to load 'asset types: {}' to {}".format(
        requested_type,dataset
    ))
    response = ASSET_CLIENT.export_assets(
        request={
            "parent": parent,"content_type": content_type,"output_config": output_config,}
    )
    print(response.result())  # This waits for the job to complete

解决方法

云资产清单导出不会在导出结束时提供 PubSub 通知。但是,在我之前的公司,导出100k+资产大约需要5分钟;也不是那么坏!如果您有更多资产,我相信您可以联系 Google Cloud(使用您的客户工程师)在路线图中添加此通知。


无论如何,如果您想构建一个解决方法,您可以使用 workflows

  • 使用 Cloud Functions 函数触发您的工作流程
  • 在您的工作流程中,
    • 调用 Cloud Asset API 将数据导出到 BigQuery
    • 获取响应并执行循环(测试导出作业状态,如果不正常,则休眠 X 秒并再次测试)
    • 作业结束后,调用 PubSub API(或直接调用 Cloud Functions 函数)提交作业状态并进行处理。