问题描述
@H_404_0@我正在尝试使用预签名的URL在s3存储桶中上传文件,它可以完美工作并将数据成功上传到存储桶,但是,我上传的文件非常大,我需要能够显示进度条。我已经尝试了StackOverflow和其他博客文章上提供的许多解决方案,但似乎无济于事。
@H_404_0@以下是使用预先签名的URL将数据上传到s3的代码段。
object_name = 'DataSet.csv'
response = create_presigned_post("mybucket_name",object_name)
fields = response['fields']
with open(object_name,'rb') as f:
files = {'file': (object_name,f)}
http_response = requests.post(response['url'],data=fields,files=files,stream=True)
print (http_response.status_code)
@H_404_0@它返回 204 状态,表示成功上传。
@H_404_0@现在,我可以对此代码进行哪些更改以显示进度条。
@H_404_0@ P.S
我在请求无效的情况下尝试了stream=True
。
我已经尝试过使用 tqdm 遍历响应,但是在这种情况下也行不通。
解决方法
我认为没有办法通过使用presignedUrl
并使用默认协议HTTP POST
请求上传大文件来做到这一点。
您可以通过使用AWS S3的分段上传机制来实现。这样,您可以知道上载的每个部分,并据此计算进度。
我创建了一个包含分段上传和presignedUrl
(打字稿)https://www.altostra.com/blog/multipart-uploads-with-s3-presigned-url
以下代码在Python上可以正常使用,我发现它here
import logging
import argparse
from boto3 import Session
import requests
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class S3MultipartUploadUtil:
"""
AWS S3 Multipart Upload Uril
"""
def __init__(self,session: Session):
self.session = session
self.s3 = session.client('s3')
self.upload_id = None
self.bucket_name = None
self.key = None
def start(self,bucket_name: str,key: str):
"""
Start Multipart Upload
:param bucket_name:
:param key:
:return:
"""
self.bucket_name = bucket_name
self.key = key
res = self.s3.create_multipart_upload(Bucket=bucket_name,Key=key)
self.upload_id = res['UploadId']
logger.debug(f"Start multipart upload '{self.upload_id}'")
def create_presigned_url(self,part_no: int,expire: int=3600) -> str:
"""
Create pre-signed URL for upload part.
:param part_no:
:param expire:
:return:
"""
signed_url = self.s3.generate_presigned_url(
ClientMethod='upload_part',Params={'Bucket': self.bucket_name,'Key': self.key,'UploadId': self.upload_id,'PartNumber': part_no},ExpiresIn=expire)
logger.debug(f"Create presigned url for upload part '{signed_url}'")
return signed_url
def complete(self,parts):
"""
Complete Multipart Uploading.
`parts` is list of dictionary below.
```
[ {'ETag': etag,'PartNumber': 1},{'ETag': etag,'PartNumber': 2},... ]
```
you can get `ETag` from upload part response header.
:param parts: Sent part info.
:return:
"""
res = self.s3.complete_multipart_upload(
Bucket=self.bucket_name,Key=self.key,MultipartUpload={
'Parts': parts
},UploadId=self.upload_id
)
logger.debug(f"Complete multipart upload '{self.upload_id}'")
logger.debug(res)
self.upload_id = None
self.bucket_name = None
self.key = None
def main():
parser = argparse.ArgumentParser()
parser.add_argument('target_file')
parser.add_argument('--bucket',required=True)
args = parser.parse_args()
target_file = Path(args.target_file)
bucket_name = args.bucket
key = target_file.name
max_size = 5 * 1024 * 1024
file_size = target_file.stat().st_size
upload_by = int(file_size / max_size) + 1
session = Session()
s3util = S3MultipartUploadUtil(session)
s3util.start(bucket_name,key)
urls = []
for part in range(1,upload_by + 1):
signed_url = s3util.create_presigned_url(part)
urls.append(signed_url)
parts = []
with target_file.open('rb') as fin:
for num,url in enumerate(urls):
part = num + 1
file_data = fin.read(max_size)
print(f"upload part {part} size={len(file_data)}")
res = requests.put(url,data=file_data)
print(res)
if res.status_code != 200:
return
etag = res.headers['ETag']
parts.append({'ETag': etag,'PartNumber': part})
print(parts)
s3util.complete(parts)
if __name__ == '__main__':
main()