带有aioboto3的Python 3 asyncio似乎是连续的

问题描述

我正在将一个简单的python 3脚本移植到AWS Lambda。 该脚本很简单:它从十二个S3对象中收集信息并返回结果。

该脚本使用multiprocessing.Pool并行收集所有文件。尽管multiprocessing不能在AWS Lambda环境中使用,因为缺少/dev/shm。 因此,我认为与其写一个肮脏的multiprocessing.Process / multiprocessing.Queue替代品,我不如尝试asyncio

我正在Python 3.8上使用aioboto3(8.0.5)的最新版本。

我的问题是,在天真的顺序下载文件与多路复用下载的异步事件循环之间,我似乎无法获得任何改善。

这是我的代码的两个版本。

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor,ProcesspoolExecutor

import boto3
import aioboto3

BUCKET = 'some-bucket'
KEYS = [
    'some/key/1',[...]
    'some/key/10',]

async def download_aio():
    """Concurrent download of all objects from S3"""
    async with aioboto3.client('s3') as s3:
        objects = [s3.get_object(Bucket=BUCKET,Key=k) for k in KEYS]
        objects = await asyncio.gather(*objects)
        buffers = await asyncio.gather(*[o['Body'].read() for o in objects])

def download():
    """Sequentially download all objects from S3"""
    s3 = boto3.client('s3')
    for key in KEYS:
        object = s3.get_object(Bucket=BUCKET,Key=key)
        object['Body'].read()

def run_sequential():
    download()

def run_concurrent():
    loop = asyncio.get_event_loop()
    #loop.set_default_executor(ProcesspoolExecutor(10))
    #loop.set_default_executor(ThreadPoolExecutor(10))
    loop.run_until_complete(download_aio())

run_sequential()run_concurrent()的时间非常相似(十几个10MB的文件大约需要3秒)。 我确信并发版本不是,原因有多种:

  • 我尝试切换到Process/ThreadPoolExecutor,并且在函数运行期间生成了进程/线程,尽管它们什么也没做
  • 顺序和并发之间的时间几乎相同,尽管我的网络接口绝对没有饱和,并且cpu也未绑定
  • 并发版本花费的时间随着文件数量增加而线性增加

我确定有些东西丢失了,但是我无法把头包裹住。

有什么想法吗?

解决方法

在花了几个小时试图了解如何正确使用aioboto3之后,我决定只是切换到备份解决方案。 我最终推出了自己的multiprocessing.Pool天真版本,可在AWS Lambda环境中使用。

如果将来有人偶然发现此线程,那就是。对于我的简单案例,它远非完美,但很容易按原样替换multiprocessing.Pool

from multiprocessing import Process,Pipe
from multiprocessing.connection import wait


class Pool:
    """Naive implementation of a process pool with mp.Pool API.

    This is useful since multiprocessing.Pool uses a Queue in /dev/shm,which
    is not mounted in an AWS Lambda environment.
    """

    def __init__(self,process_count=1):
        assert process_count >= 1
        self.process_count = process_count

    @staticmethod
    def wrap_pipe(pipe,index,func):
        def wrapper(args):
            try:
                result = func(args)
            except Exception as exc:  # pylint: disable=broad-except
                result = exc
            pipe.send((index,result))
        return wrapper

    def __enter__(self):
        return self

    def __exit__(self,exc_type,exc_value,exc_traceback):
        pass

    def map(self,function,arguments):
        pending = list(enumerate(arguments))
        running = []
        finished = [None] * len(pending)
        while pending or running:
            # Fill the running queue with new jobs
            while len(running) < self.process_count:
                if not pending:
                    break
                index,args = pending.pop(0)
                pipe_parent,pipe_child = Pipe(False)
                process = Process(
                    target=Pool.wrap_pipe(pipe_child,function),args=(args,))
                process.start()
                running.append((index,process,pipe_parent))
            # Wait for jobs to finish
            for pipe in wait(list(map(lambda t: t[2],running))):
                index,result = pipe.recv()
                # Remove the finished job from the running list
                running = list(filter(lambda x: x[0] != index,running))
                # Add the result to the finished list
                finished[index] = result

        return finished