问题描述
因此,这个问题最终与python和S3有关。
file1 --------- 2GB
file2 --------- 3GB
file3 --------- 1.9GB
file4 --------- 5GB
我需要做的就是让客户端能够以ZIP(或类似格式)下载它们,但是我不能在服务器存储的内存中进行下载,因为这是无服务器的设置。
根据我的理解,理想情况下,服务器需要:
- 在S3上开始多部分上传作业
- 可能需要将一个块作为zip文件的头发送到多部分作业;
- 以某种流的形式逐块下载存储桶中的每个文件,以免溢出 记忆
- 使用上面所述的流向他们创建一个zip块并将其发送到多部分作业中
- 完成多部分作业和zip文件
现在,老实说,我什至不知道如何实现这一目标,甚至还不知道,但有些问题是:
编辑:现在我考虑了一下,也许我什至不需要将ZIP文件放在S3中,我可以直接流式传输给客户端,对吗?实际上那会好得多
以下是假设我在上面进行编辑的一些假设代码:
#Let's assume Flask
@app.route(/'download_bucket_as_zip'):
def stream_file():
def stream():
#Probably needs to yield zip headers/Metadata?
for file in getFilesFromBucket():
for chunk in file.readChunk(4000):
zipchunk = bytesToZipChunk(chunk)
yield zipchunk
return Response(stream(),mimetype='application/zip')
解决方法
我需要做的就是让客户端能够下载它们 全部都以ZIP(或类似格式)保存,但是我既不能在内存中也不能这样做 服务器存储,因为这是无服务器设置。
当您少说服务器时,如果您要使用Lambda在S3中创建一个zip文件,则会遇到一些限制:
- Lambda对可以执行多长时间有时间限制。
- 由于Lambda有内存限制,因此在Lambda函数中组装大文件可能会遇到问题
- Lambda对PUT调用的最大大小有限制。
基于上述原因,我认为以下方法更好:
- 需要文件时,可以动态创建EC2实例。也许您的lambda函数可以触发EC2实例的创建。
- 将所有文件复制到计算机甚至EFS的实例存储中。
- 将文件压缩为zip
- 将文件上传回S3或直接提供文件
- 杀死EC2实例。
我认为这将大大简化您必须编写的代码,因为在笔记本电脑/台式机上运行的任何代码都可能在EC2实例上运行。您也没有lambda的时间/空间限制。
由于将zip文件上传回S3后就可以摆脱EC2实例,因此您不必担心服务器始终运行的成本-只需在需要时将其旋转并杀死它即可完成后。
用于压缩文件夹中多个文件的代码可能很简单:
来自:https://code.tutsplus.com/tutorials/compressing-and-extracting-files-in-python--cms-26816
import os
import zipfile
fantasy_zip = zipfile.ZipFile('C:\\Stories\\Fantasy\\archive.zip','w')
for folder,subfolders,files in os.walk('C:\\Stories\\Fantasy'):
for file in files:
if file.endswith('.pdf'):
fantasy_zip.write(os.path.join(folder,file),os.path.relpath(os.path.join(folder,'C:\\Stories\\Fantasy'),compress_type = zipfile.ZIP_DEFLATED)
fantasy_zip.close()
,
import io
class S3File(io.RawIOBase):
def __init__(self,s3_object):
self.s3_object = s3_object
self.position = 0
def __repr__(self):
return "<%s s3_object=%r>" % (type(self).__name__,self.s3_object)
@property
def size(self):
return self.s3_object.content_length
def tell(self):
return self.position
def seek(self,offset,whence=io.SEEK_SET):
if whence == io.SEEK_SET:
self.position = offset
elif whence == io.SEEK_CUR:
self.position += offset
elif whence == io.SEEK_END:
self.position = self.size + offset
else:
raise ValueError("invalid whence (%r,should be %d,%d,%d)" % (
whence,io.SEEK_SET,io.SEEK_CUR,io.SEEK_END
))
return self.position
def seekable(self):
return True
def read(self,size=-1):
if size == -1:
# Read to the end of the file
range_header = "bytes=%d-" % self.position
self.seek(offset=0,whence=io.SEEK_END)
else:
new_position = self.position + size
# If we're going to read beyond the end of the object,return
# the entire object.
if new_position >= self.size:
return self.read()
range_header = "bytes=%d-%d" % (self.position,new_position - 1)
self.seek(offset=size,whence=io.SEEK_CUR)
return self.s3_object.get(Range=range_header)["Body"].read()
def readable(self):
return True
if __name__ == "__main__":
import zipfile
import boto3
s3 = boto3.resource("s3")
s3_object = s3.Object(bucket_name="bukkit",key="bagit.zip")
s3_file = S3File(s3_object)
with zipfile.ZipFile(s3_file) as zf:
print(zf.namelist())
,
您的问题非常复杂,因为解决该问题可能会使您陷入困境。
我相信Rahul Iyer处在正确的轨道上,因为恕我直言,启动新的EC2实例并压缩该实例上的文件并将它们移回到仅向客户端提供zip文件的S3存储桶会更容易。
如果文件较小,则可以在客户端请求文件时使用AWS Cloudfront处理压缩。
在研究期间,我确实注意到其他语言(例如.Net和Java)具有处理流入zip文件的API。我还查看了zipstream,它已经分叉了好几次了。尚不清楚如何使用zipstream来流式传输文件以进行压缩。
以下代码将对文件进行分块,并将卡盘写入zip文件。输入文件接近12Gbs,输出文件接近5Gbs。
在测试过程中,我没有发现内存使用或大峰值的任何主要问题。
我确实在以下帖子之一中添加了一些伪S3代码。我认为需要更多测试才能了解此代码如何在S3中的文件上工作。
from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED
# This module is needed for ZIP_DEFLATED
import zlib
class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self,b):
if self.closed:
raise ValueError('The stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk
def zipfile_generator(path,stream):
with ZipFile(stream,mode='w') as zip_archive:
z_info = ZipInfo.from_file(path)
z_info.compress_type = ZIP_DEFLATED
with open(path,'rb') as entry,zip_archive.open(z_info,mode='w') as dest:
for chunk in iter(lambda: entry.read(16384),b''): # 16384 is the maximum size of an SSL/TLS buffer.
dest.write(chunk)
yield stream.get()
yield stream.get()
stream = UnseekableStream()
# each on the input files was 4gb
files = ['input.txt','input2.txt','input3.txt']
with open("test.zip","wb") as f:
for item in files:
for i in zipfile_generator(item,stream):
f.write(i)
f.flush()
stream.close()
f.close()
伪代码s3 /邮政编码
此代码严格是假设的,因为它需要测试。
from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED
import os
import boto3
# This module is needed for ZIP_DEFLATED
import zlib
session = boto3.Session(
aws_access_key_id='XXXXXXXXXXXXXXXXXXXXXXX',aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',region_name='XXXXXXXXXX')
s3 = session.resource('s3')
bucket_name = s3.Bucket('bucket name')
class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self,b):
if self.closed:
raise ValueError('The stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk
def zipfile_generator(path,mode='w') as dest:
for chunk in iter(lambda: entry.read(16384),b''):
dest.write(chunk)
yield stream.get()
yield stream.get()
stream = UnseekableStream()
with open("test.zip","wb") as f:
for file in bucket_name.objects.all():
obj = s3.get_object(Bucket=bucket_name,Key=file.key)
for i in zipfile_generator(obj.get(),stream):
f.write(i)
f.flush()
stream.close()
f.close()
,
也许最好使用JavaScript编写的zip编码器之一,例如StreamBuilder(
stream: Firestore.instance
.collection("customers")
.document(currentUser.uid)
.collection("favSalons")
.snapshots(),builder:
(context,AsyncSnapshot<QuerySnapshot> snapshot) {
if (snapshot.hasData && snapshot.connectionState == ConnectionState.active) {
return Container(
margin:
EdgeInsets.only(bottom: screenHeight * 0.33),child: new ListView(
children: getFavSalons(snapshot),),);
}
return LoadingSalon();
}),
。或类似JSZip