Python“FileHandler”到 Azure BlobStorage - 没有这样的东西

问题描述

我遇到过这样的情况,我在 blob 存储中有一个非常大的 SAS 文件 (300GB),需要在 Azure ML 服务工作区中进行处理。他们的主要任务是将其转换为一堆镶木地板文件

当然,我可以将文件下载到 Azure ML 工作区的 FileShare 容器,然后使用 Pandas read_sas() 并定义适当的块大小来处理文件

    local_file_path = "./dld/mysas.sas7bat"
    with open(local_file_path,"wb") as local_file:
       downloader = blobclient.download_blob()
       downloader.readinto(local_file)
    
    reader = pd.read_sas(local_file_path,format='sas7bdat',chunksize=1000000)

    count = 0
    prefix="chunks/testchunk"
    chunk: pd.DataFrame
    for chunk in reader:
        count += 1
        name = prefix + str(count) + ".parquet"
        chunk.to_parquet(name,engine = "pyarrow")

这当然有效。但是,我希望有一种更有效的方法。就像能够直接从 blobstorage 创建“文件流”一样。无需先将其下载到挂载的 Azure 文件共享中。但我什么也没找到。所以我为 blob 存储编写了自己的“流包装器”:

class BlobStorageFileHandler(RawIOBase):

    def __init__(self,storageDownloader: StorageStreamDownloader):
        self.downloader = storageDownloader
        self.chunks = self.downloader.chunks()
        self.current_pos = 0
        self.current_chunk_len = 0
        self.current_chunk = None
        self._read_next_chunk()

    def _read_next_chunk(self):
        self.current_chunk: bytes = next(self.chunks,None)
        if self.current_chunk is None:
            self.current_chunk_len = 0
        else:
            self.current_chunk_len = len(self.current_chunk)

    def seekable(self) -> bool:
        return True

    def seek(self,offset,whence=SEEK_SET):
        print("seek: ",offset)

    def readable(self) -> bool:
        return True

    def read(self,size=-1):
        end_pos = self.current_pos + size
        
        # if more bytes are requested that are left in the current read bytes-chunk
        if end_pos > self.current_chunk_len:
            # number of bytes that have to be read from the old chunk
            rest_part = self.current_chunk_len - self.current_pos
            # number of bytes that have to be read from the next chunk
            new_part = end_pos - self.current_chunk_len

            old_chunk_end: bytes = b''
            if rest_part > 0:
                old_chunk_end = self.current_chunk[self.current_pos: self.current_pos + rest_part]

            self._read_next_chunk()

            new_chunk_part: bytes
            # if there was no further chunk left to be read
            if self.current_chunk is None:
                if rest_part > 0:
                    return old_chunk_end
                return b''
            
            if self.current_chunk_len > new_part:
                new_chunk_part = self.current_chunk[0: new_part]
            else:
                new_chunk_part = self.current_chunk

            self.current_pos = new_part
            return old_chunk_end + new_chunk_part
        else:
            result = self.current_chunk[self.current_pos:self.current_pos + size]
            self.current_pos += size
            return result

有了这个类,就不必先将数据下载到挂载的 FileShare(或任何本地目录,如果在本地使用):

    sas_file = BlobStorageFileHandler(blobclient.download_blob())
    reader = pd.read_sas(sas_file,chunksize=1000000)

这只是第一个解决方案,不是很复杂,但它按预期工作。

然而,我无法想象只有我一个人有这种需求,因此,我想知道是否有其他解决方案或现有的包装类,就像我上面展示的那样。

感谢任何输入。

谢谢 汉斯约格

解决方法

您也可以使用 download_blob 方法分块下载 blob。基本上,您需要指定 offset(起始位置)和 length(要读取的字节数)参数,并且该方法只会将这些字节返回给您。

在您当前的实现中,由于您没有指定这些参数,因此 SDK 正在下载整个 blob。