问题描述
查看关于 this 问题的代码,我希望能够创建一个数据流管道,该管道可以查看特定 gcs 存储桶文件夹中的所有文件,并说明具有最大数据量的最终子目录字节数。我会编写类似于以下内容的代码:
class SortFiles(beam.DoFn):
def __init__(self,gfs):
self.gfs = gfs
def process(self,file_Metadata):
if file_Metadata.size_in_bytes > 0:
# Sort the files here?
class SortFolders(beam.DoFn):
def __init__(self,file_Metadata):
if file_Metadata.size_in_bytes > 0:
# Sort the folders here based on maximum addition of a combination
# of the file sizes and file numbers
def delete_empty_files():
options = PipelineOptions(...)
gfs = gcs.GCSFileSystem(pipeline_options)
p = beam.Pipeline(options=pipeline_options)
discover_empty = p | 'Filenames' >> beam.Create(gfs.match(gs_folder).Metadata_list)
| 'Reshuffle' >> beam.Reshuffle()
| 'SortFilesbySize' >> beam.ParDo(SortFiles(gfs))
| 'SortFoldersbySize' >> beam.ParDo(SortFolders(gfs))
| 'OutputFolders' >> ...
我还没有决定是按总字节数还是按其中的文件总数列出文件夹。我将如何解决这个问题?另一个问题在于,我希望能够找到此任务的最终子目录而不是其父文件夹。
解决方法
GCSFileSystem
有一个函数,du
会告诉你特定路径下的大小。 https://gcsfs.readthedocs.io/en/latest/api.html?highlight=du#gcsfs.core.GCSFileSystem
在阅读您的问题时,我认为您想
- 首先在存储桶中查找本身不包含目录的所有目录(如果我理解“最终子目录”)
- 然后对它们中的每一个运行
du
, - 然后按大小对结果列表进行排序
如果您尝试对嵌套的文件进行计数:
- 列出所有对象,名称将是 a/、a/b.txt、a/b/c.txt 等
- 编写一个函数来统计每个子路径下嵌套的对象