通过Google Cloud功能将大量任务排队

问题描述

我正在尝试通过每天调用一次外部API使用云功能来更新数据。

到目前为止,我有

问题在于每天有约2k个项目需要更新,并且云功能会在此之前超时,因此为什么我将它们放在队列中。但是,即使将项目放入队列中,对于云功能来说也花费了太长时间,因此在将其全部添加之前就已经超时了。

是否有一种简单的方法可以一次将多个任务批量添加到队列中?

是否通过失败,更好的解决方案?

全部用python编写

功能1的代码

def refresh(request):
    for i in items:
        # Create a client.
        client = tasks_v2.CloudTasksClient()

        # Todo(developer): Uncomment these lines and replace with your values.
        project = 'my-project'
        queue = 'refresh-queue'
        location = 'europe-west2'
        name = i['name'].replace(' ','')
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name={name}"

        # Construct the fully qualified queue name.
        parent = client.queue_path(project,location,queue)

        # Construct the request body.
        task = {
            "HTTP_Request": {  # Specify the type of request.
                "http_method": tasks_v2.HttpMethod.GET,"url": url,# The full url path that the task will be sent to.
            }
        }
        

        # Use the client to build and send the task.
        response = client.create_task(request={"parent": parent,"task": task})

解决方法

回答您的问题“是否有一种简单的方法可以一次将多个任务批量添加到队列?”根据公众的documentation,最好的方法是实施双重注入模式。

为此,您将有一个新队列,您将在其中添加一个包含原始队列的多个任务的单个任务,然后在该队列的接收端,您将有一个获取此数据的服务。任务,并在第二个队列中为每个条目创建一个任务。

此外,我建议您将500/50/5模式用于冷队列。这将有助于任务队列和Cloud Function服务以安全比率提高。

,

Chris32的答案是正确的,但是我在您的代码片段中注意到的一件事是您应该在for循环之外创建客户端。

def refresh(request):
    # Create a client.
    client = tasks_v2.CloudTasksClient()

    # TODO(developer): Uncomment these lines and replace with your values.
    project = 'my-project'
    queue = 'refresh-queue'
    location = 'europe-west2'
    for i in items:
        name = i['name'].replace(' ','')
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name={name}"

        # Construct the fully qualified queue name.
        parent = client.queue_path(project,location,queue)

        # Construct the request body.
        task = {
            "http_request": {  # Specify the type of request.
                "http_method": tasks_v2.HttpMethod.GET,"url": url,# The full url path that the task will be sent to.
            }
        }
        

        # Use the client to build and send the task.
        response = client.create_task(request={"parent": parent,"task": task})

在应用引擎中,我将在client = tasks_v2.CloudTasksClient()之外在文件级别执行def refresh,但是我不知道这对云功能是否重要。

第二件事,

修改“功能2”以采用多个“名称”,而不只是一个。然后可以在“功能1”中一次向“功能2”发送10个名称

BATCH_SIZE = 10  # send 10 names to Function 2

def refresh(request):
    # Create a client.
    client = tasks_v2.CloudTasksClient()
    # ...
    for i in range(0,len(items),BATCH_SIZE)]:
        items_batch = items[i:i + BATCH_SIZE]
        names = ','.join([i['name'].replace(' ','') for i in items_batch])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?names={names}"

        # Construct the fully qualified queue name.
        # ...

如果这2个快速修复方法不起作用,则必须将“功能1”分为“功能1A”和“功能1B”

功能1A:

BATCH_SIZE = 100  # send 100 names to Function 1B

def refresh(request):
    client = tasks_v2.CloudTasksClient()
    for i in range(0,'') for i in items_batch])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-1b?names={names}"

        # send the task.
        response = client.create_task(request={
            "parent": client.queue_path('my-project','europe-west2','refresh-queue'),"task": {
                "http_request": {"http_method": tasks_v2.HttpMethod.GET,"url": url}
        }})

功能1B:

BATCH_SIZE = 10  # send 10 names to Function 2

def refresh(request):
    # set `names` equal to the query param `names`
    client = tasks_v2.CloudTasksClient()
    for i in range(0,len(names),BATCH_SIZE)]:
        names_batch = ','.join(names[i:i + BATCH_SIZE])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-2?names={names_batch}"

        # send the task.
        response = client.create_task(request={
            "parent": client.queue_path('my-project',"url": url}
        }})