问题描述
我有一个名为Downloader
的类,我想使用该类来处理来自S3的交易。我想使用并发性尽快从S3下载内容,as per this SO post,但是当我将所有并发性代码放在班级中时,我会得到一些奇怪的行为。
运行下面的代码
import boto3
import botocore
from concurrent import futures
class Downloader:
def __init__(self,aws_key,aws_id):
self.s3_client = boto3.resource('s3',aws_access_key_id=aws_id,aws_secret_access_key=aws_key)
def download_s3(self,key,bucket='bucket'):
try:
f = self.s3_client.Meta.client.get_object(Bucket=bucket,Key=key)['Body'].read()
except ClientError as e:
print(f'Key {key} not found')
return None
return np.frombuffer(f)
def download_s3_parallel(self,keys):
print(f'executor called...')
with futures.ThreadPoolExecutor(max_workers=1) as executor:
print(f'launching threads')
future_to_key = {executor.submit(self.download_s3,key): key for key in keys}
print(future_to_key)
print(f'looping through completed threads')
for future in futures.as_completed(future_to_key):
key = future_to_key[future]
exception = future.exception()
if not exception:
yield key,future.result()
else:
yield key,exception
keys = ['key1','key2','key3']
dl = Downloader(<my_secret_key>,<my_aws_id>)
all_objects = dl.download_s3_parallel(keys)
给出无输出,但是当我注释掉for循环时,如下所示:
class Downloader:
def __init__(self,key): key for key in keys}
print(future_to_key)
print(f'looping through completed threads')
# for future in futures.as_completed(future_to_key):
# key = future_to_key[future]
# exception = future.exception()
# if not exception:
# yield key,future.result()
# else:
# yield key,exception
输出变为
executor called...
launching threads
{<Future at 0x7f5f8dd12430 state=running>: 'key1',<Future at 0x7f5f8dd12730 state=pending>: 'key2',<Future at 0x7f5f8dd2c8b0 state=pending>: 'key3'}
looping through completed threads
所以我的问题是:为什么for循环导致整个download_features_s3_parallel
函数无法运行?即使在该方法开头的print语句也不会被调用。发生这种情况的原因是什么?
如果是因为for循环引发错误,如何找出错误所在?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)