在AWS S3中分块创建大型zip文件

问题描述

因此,这个问题最终与python和S3有关。

假设我有一个带有这些文件的S3存储桶:

file1 --------- 2GB
file2 --------- 3GB
file3 --------- 1.9GB
file4 --------- 5GB

这些文件是使用S3的预签名帖子URL上传

我需要做的就是让客户端能够以ZIP(或类似格式)下载它们,但是我不能在服务器存储的内存中进行下载,因为这是无服务器的设置。

根据我的理解,理想情况下,服务器需要:

  1. 在S3上开始多部分上传作业
  2. 可能需要将一个块作为zip文件的头发送到多部分作业;
  3. 以某种流的形式逐块下载存储桶中的每个文件,以免溢出 记忆
  4. 使用上面所述的流向他们创建一个zip块并将其发送到多部分作业中
  5. 完成多部分作业和zip文件

现在,老实说,我什至不知道如何实现这一目标,甚至还不知道,但有些问题是:

  • 如何在S3中分块下载文件?最好使用boto3或botocore
  • 如何在释放内存的同时大块创建一个zip文件
  • 如何在multipartupload中将所有这些连接起来?

编辑:现在我考虑了一下,也许我什至不需要将ZIP文件放在S3中,我可以直接流式传输给客户端,对吗?实际上那会好得多

以下是假设我在上面进行编辑的一些假设代码

  #Let's assume Flask
  @app.route(/'download_bucket_as_zip'):
  def stream_file():
    def stream():
      #Probably needs to yield zip headers/Metadata?
      for file in getFilesFromBucket():
         for chunk in file.readChunk(4000):
            zipchunk = bytesToZipChunk(chunk)
            yield zipchunk
    return Response(stream(),mimetype='application/zip')

解决方法

我需要做的就是让客户端能够下载它们 全部都以ZIP(或类似格式)保存,但是我既不能在内存中也不能这样做 服务器存储,因为这是无服务器设置。

当您少说服务器时,如果您要使用Lambda在S3中创建一个zip文件,则会遇到一些限制:

  • Lambda对可以执行多长时间有时间限制。
  • 由于Lambda有内存限制,因此在Lambda函数中组装大文件可能会遇到问题
  • Lambda对PUT调用的最大大小有限制。

基于上述原因,我认为以下方法更好:

  • 需要文件时,可以动态创建EC2实例。也许您的lambda函数可以触发EC2实例的创建。
  • 将所有文件复制到计算机甚至EFS的实例存储中。
  • 将文件压缩为zip
  • 将文件上传回S3或直接提供文件
  • 杀死EC2实例。

我认为这将大大简化您必须编写的代码,因为在笔记本电脑/台式机上运行的任何代码都可能在EC2实例上运行。您也没有lambda的时间/空间限制。

由于将zip文件上传回S3后就可以摆脱EC2实例,因此您不必担心服务器始终运行的成本-只需在需要时将其旋转并杀死它即可完成后。

用于压缩文件夹中多个文件的代码可能很简单:

来自:https://code.tutsplus.com/tutorials/compressing-and-extracting-files-in-python--cms-26816

import os
import zipfile
 
fantasy_zip = zipfile.ZipFile('C:\\Stories\\Fantasy\\archive.zip','w')
 
for folder,subfolders,files in os.walk('C:\\Stories\\Fantasy'):
 
    for file in files:
        if file.endswith('.pdf'):
            fantasy_zip.write(os.path.join(folder,file),os.path.relpath(os.path.join(folder,'C:\\Stories\\Fantasy'),compress_type = zipfile.ZIP_DEFLATED)
 
fantasy_zip.close()
,
import io


class S3File(io.RawIOBase):
    def __init__(self,s3_object):
        self.s3_object = s3_object
        self.position = 0

    def __repr__(self):
        return "<%s s3_object=%r>" % (type(self).__name__,self.s3_object)

    @property
    def size(self):
        return self.s3_object.content_length

    def tell(self):
        return self.position

    def seek(self,offset,whence=io.SEEK_SET):
        if whence == io.SEEK_SET:
            self.position = offset
        elif whence == io.SEEK_CUR:
            self.position += offset
        elif whence == io.SEEK_END:
            self.position = self.size + offset
        else:
            raise ValueError("invalid whence (%r,should be %d,%d,%d)" % (
                whence,io.SEEK_SET,io.SEEK_CUR,io.SEEK_END
            ))

        return self.position

    def seekable(self):
        return True

    def read(self,size=-1):
        if size == -1:
            # Read to the end of the file
            range_header = "bytes=%d-" % self.position
            self.seek(offset=0,whence=io.SEEK_END)
        else:
            new_position = self.position + size

            # If we're going to read beyond the end of the object,return
            # the entire object.
            if new_position >= self.size:
                return self.read()

            range_header = "bytes=%d-%d" % (self.position,new_position - 1)
            self.seek(offset=size,whence=io.SEEK_CUR)

        return self.s3_object.get(Range=range_header)["Body"].read()

    def readable(self):
        return True


if __name__ == "__main__":
    import zipfile

    import boto3

    s3 = boto3.resource("s3")
    s3_object = s3.Object(bucket_name="bukkit",key="bagit.zip")

    s3_file = S3File(s3_object)

    with zipfile.ZipFile(s3_file) as zf:
        print(zf.namelist())
,

您的问题非常复杂,因为解决该问题可能会使您陷入困境。

我相信Rahul Iyer处在正确的轨道上,因为恕我直言,启动新的EC2实例并压缩该实例上的文件并将它们移回到仅向客户端提供zip文件的S3存储桶会更容易。

如果文件较小,则可以在客户端请求文件时使用AWS Cloudfront处理压缩。

在研究期间,我确实注意到其他语言(例如.Net和Java)具有处理流入zip文件的API。我还查看了zipstream,它已经分叉了好几次了。尚不清楚如何使用zipstream来流式传输文件以进行压缩。

以下代码将对文件进行分块,并将卡盘写入zip文件。输入文件接近12Gbs,输出文件接近5Gbs。

在测试过程中,我没有发现内存使用或大峰值的任何主要问题。

我确实在以下帖子之一中添加了一些伪S3代码。我认为需要更多测试才能了解此代码如何在S3中的文件上工作。

from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED

# This module is needed for ZIP_DEFLATED
import zlib


class UnseekableStream(RawIOBase):
def __init__(self):
    self._buffer = b''

def writable(self):
    return True

def write(self,b):
    if self.closed:
        raise ValueError('The stream was closed!')
    self._buffer += b
    return len(b)

def get(self):
    chunk = self._buffer
    self._buffer = b''
    return chunk


def zipfile_generator(path,stream):
   with ZipFile(stream,mode='w') as zip_archive:
       z_info = ZipInfo.from_file(path)
       z_info.compress_type = ZIP_DEFLATED
       with open(path,'rb') as entry,zip_archive.open(z_info,mode='w') as dest: 
          for chunk in iter(lambda: entry.read(16384),b''): # 16384 is the maximum size of an SSL/TLS buffer.
             dest.write(chunk)
             yield stream.get()
 yield stream.get()


stream = UnseekableStream()
# each on the input files was 4gb
files = ['input.txt','input2.txt','input3.txt']
with open("test.zip","wb") as f:
   for item in files:
      for i in zipfile_generator(item,stream):
         f.write(i)
         f.flush()
stream.close()
f.close()

伪代码s3 /邮政编码

此代码严格是假设的,因为它需要测试。

from io import RawIOBase
from zipfile import ZipFile
from zipfile import ZipInfo
from zipfile import ZIP_DEFLATED
import os

import boto3

# This module is needed for ZIP_DEFLATED
import zlib

session = boto3.Session(
aws_access_key_id='XXXXXXXXXXXXXXXXXXXXXXX',aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',region_name='XXXXXXXXXX')

s3 = session.resource('s3')
bucket_name = s3.Bucket('bucket name')

class UnseekableStream(RawIOBase):
   def __init__(self):
      self._buffer = b''

   def writable(self):
      return True

   def write(self,b):
      if self.closed:
        raise ValueError('The stream was closed!')
    self._buffer += b
    return len(b)

    def get(self):
      chunk = self._buffer
      self._buffer = b''
      return chunk


def zipfile_generator(path,mode='w') as dest:
           for chunk in iter(lambda: entry.read(16384),b''):
            dest.write(chunk)
              yield stream.get()
    yield stream.get()


stream = UnseekableStream()
with open("test.zip","wb") as f:
   for file in bucket_name.objects.all():
     obj = s3.get_object(Bucket=bucket_name,Key=file.key)
     for i in zipfile_generator(obj.get(),stream):
        f.write(i)
        f.flush()
stream.close()
f.close()
,

也许最好使用JavaScript编写的zip编码器之一,例如StreamBuilder( stream: Firestore.instance .collection("customers") .document(currentUser.uid) .collection("favSalons") .snapshots(),builder: (context,AsyncSnapshot<QuerySnapshot> snapshot) { if (snapshot.hasData && snapshot.connectionState == ConnectionState.active) { return Container( margin: EdgeInsets.only(bottom: screenHeight * 0.33),child: new ListView( children: getFavSalons(snapshot),),); } return LoadingSalon(); }),。或类似JSZip