问题描述
我正在将一个简单的python 3脚本移植到AWS Lambda。 该脚本很简单:它从十二个S3对象中收集信息并返回结果。
该脚本使用multiprocessing.Pool
并行收集所有文件。尽管multiprocessing
不能在AWS Lambda环境中使用,因为缺少/dev/shm
。
因此,我认为与其写一个肮脏的multiprocessing.Process
/ multiprocessing.Queue
替代品,我不如尝试asyncio
。
我正在Python 3.8上使用aioboto3
(8.0.5)的最新版本。
我的问题是,在天真的顺序下载文件与多路复用下载的异步事件循环之间,我似乎无法获得任何改善。
这是我的代码的两个版本。
import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor,ProcesspoolExecutor
import boto3
import aioboto3
BUCKET = 'some-bucket'
KEYS = [
'some/key/1',[...]
'some/key/10',]
async def download_aio():
"""Concurrent download of all objects from S3"""
async with aioboto3.client('s3') as s3:
objects = [s3.get_object(Bucket=BUCKET,Key=k) for k in KEYS]
objects = await asyncio.gather(*objects)
buffers = await asyncio.gather(*[o['Body'].read() for o in objects])
def download():
"""Sequentially download all objects from S3"""
s3 = boto3.client('s3')
for key in KEYS:
object = s3.get_object(Bucket=BUCKET,Key=key)
object['Body'].read()
def run_sequential():
download()
def run_concurrent():
loop = asyncio.get_event_loop()
#loop.set_default_executor(ProcesspoolExecutor(10))
#loop.set_default_executor(ThreadPoolExecutor(10))
loop.run_until_complete(download_aio())
run_sequential()
和run_concurrent()
的时间非常相似(十几个10MB的文件大约需要3秒)。
我确信并发版本不是,原因有多种:
- 我尝试切换到
Process/ThreadPoolExecutor
,并且在函数运行期间生成了进程/线程,尽管它们什么也没做 - 顺序和并发之间的时间几乎相同,尽管我的网络接口绝对没有饱和,并且cpu也未绑定
- 并发版本花费的时间随着文件数量的增加而线性增加。
我确定有些东西丢失了,但是我无法把头包裹住。
有什么想法吗?
解决方法
在花了几个小时试图了解如何正确使用aioboto3
之后,我决定只是切换到备份解决方案。
我最终推出了自己的multiprocessing.Pool
天真版本,可在AWS Lambda环境中使用。
如果将来有人偶然发现此线程,那就是。对于我的简单案例,它远非完美,但很容易按原样替换multiprocessing.Pool
。
from multiprocessing import Process,Pipe
from multiprocessing.connection import wait
class Pool:
"""Naive implementation of a process pool with mp.Pool API.
This is useful since multiprocessing.Pool uses a Queue in /dev/shm,which
is not mounted in an AWS Lambda environment.
"""
def __init__(self,process_count=1):
assert process_count >= 1
self.process_count = process_count
@staticmethod
def wrap_pipe(pipe,index,func):
def wrapper(args):
try:
result = func(args)
except Exception as exc: # pylint: disable=broad-except
result = exc
pipe.send((index,result))
return wrapper
def __enter__(self):
return self
def __exit__(self,exc_type,exc_value,exc_traceback):
pass
def map(self,function,arguments):
pending = list(enumerate(arguments))
running = []
finished = [None] * len(pending)
while pending or running:
# Fill the running queue with new jobs
while len(running) < self.process_count:
if not pending:
break
index,args = pending.pop(0)
pipe_parent,pipe_child = Pipe(False)
process = Process(
target=Pool.wrap_pipe(pipe_child,function),args=(args,))
process.start()
running.append((index,process,pipe_parent))
# Wait for jobs to finish
for pipe in wait(list(map(lambda t: t[2],running))):
index,result = pipe.recv()
# Remove the finished job from the running list
running = list(filter(lambda x: x[0] != index,running))
# Add the result to the finished list
finished[index] = result
return finished