如何减少铸造数据集中的文件数?

问题描述

我的数据集有20000个文件,每个文件很小。 如何减少文件数量,什么是最佳数量

解决方法

最简单的方法是在转换结束时显式地执行repartition()(如果分区数严格从原始数减少,则coalesce())。

这应该是您返回/写出结果之前的最终通话。

这看起来像:

# ...

@transform_df(
  # ... inputs
)
def my_compute_function(my_inputs):
  # ... my transform logic ...

  df = df.coalesce(500) 
  # df = df.repartition(500) # this also works but is slightly slower than coalesce
  return df

这是称为“存储桶”的参考的先行步骤。

最佳存储桶数取决于您使用的数据规模。成功构建后,通过观察磁盘上数据集的总大小,可以计算出最佳的存储桶数量。

如果数据集的大小为128GB,则最终将需要128MB文件,因此存储桶数为:

128 GB * (1000 MB / 1 GB) * (1 file / 128MB) -> 1000 files

注意:这不是精确的计算,因为更改数据段数后的最终数据集大小将因Snappy + Parquet写入中使用的数据压缩而有所不同。您会注意到文件大小与预期的略有不同,因此在上例中最终可能需要1100或900个文件

,

由于这是我不得不解决的问题,因此,我决定编写一本更详细的指南,其中包含许多不同的技术,优缺点和存在的理由。

为什么要减少文件数量?

有两个很好的理由来避免包含许多文件的数据集:

  • 读取性能可能会更差。当数据分散在许多小文件中时,轮廓(分析)之类的应用程序的性能可能会严重受损,因为执行者必须承担从备份文件系统下载许多小文件的开销。
  • 如果后备文件系统是HDFS ,许多小文件将增加hadoop名称节点和八卦协议的堆压力。 HDFS不能很好地处理许多小文件,因为它不能流式传输/分页文件系统中的文件列表,而是构造包含所有文件的完整枚举的消息。当HDFS中有成千上万个文件系统对象时,最终会碰到名称节点RPC消息大小限制(可以在配置中增加)和可用堆内存(可以在配置中增加) ...如果您有更多的可用内存。)节点间的通信越来越慢。
  • 转换变慢,因为(当前甚至是增量转换)驱动程序线程必须从目录中检索当前视图中所有文件的完整列表,以及交易的元数据和出处(这仅是切线相关的,但很多文件与许多事务相关联并不少见
  • 转换可以使驱动程序OOM ,因为文件集和事务集会在某些时间点保留在内存中。可以通过为驱动程序分配更大的内存配置文件来解决此问题-但这会增加成本和/或减少可用于其他管道的资源。

为什么我们首先要在数据集中得到很多文件?

最终导致包含许多文件的数据集通常是由以下三个原因之一引起的:

  • 提取许多小文件的文件
  • (行为不当)转换会产生许多小文件。每次在火花中执行大范围操作时,都可能发生混洗。例如,当执行groupBy(这意味着随机播放)时,spark默认情况下会选择将数据重新划分为200个新分区,这对于例如增量转换。转换也会由于分区不正确而产生太多的输出文件(如下所述)。
  • 一个递增运行且频繁运行的管道。每次管道运行并处理(通常是很小的)一段数据时,都会在每个数据集上创建一个新事务,每个事务至少包含一个文件。

接下来,我将列出所有我知道的减少数据集中文件数量的方法,以及它们的缺点和优点,以及适用时的一些特征。

摄取时(磁石变压器)

最好的选择之一是避免一开始就拥有多个文件。从中提取许多文件时作为类似文件系统的源,像“连接转换器”这样的magritte转换器可能有助于将许多CSV,JSON或XML文件组合到一个文件中。在适用时,连接并应用gzip转换器是一种特别有效的策略,因为它通常会将XML和类似文本格式的大小减少94%左右。

主要限制是要应用此功能,您需要

  • 每当提取运行时就有多个文件可用(因此对于在频繁更新的数据源上频繁运行的提取效果不佳)
  • 有一个数据源,可为您提供可以串联的文件

也可以将许多文件压缩为更少的文件(使用.tar.bz2,.tar.gz,.zip,.rar等格式),但这随后需要知道的下游转换此文件格式并手动解压缩(文档中提供了此示例),因为铸造厂无法透明地提供这些档案中的数据。但是,没有预制的magritte处理器可以做到这一点,在我应用这种技术的场合,我在摄取之前使用bash脚本执行了此任务,这显然不理想。

背景压实

铸造厂中有一种新机制,可将您写入的数据集与读取的数据集分离。本质上,有一个后台作业正在运行,它会在您添加文件时将文件混洗为优化索引,这样,数据集的读取(大部分)可以转到该优化索引,而不是编写者留下的(通常有些武断)数据布局。

这具有多种好处(例如自动生成针对最常见的读取模式进行了优化的数据布局),其中之一是它可以在后台“精简”您的数据集。

从这样的数据集读取时,您的读取实际上会击中索引以及输入数据集(其中包含尚未由后台进程合并到索引中的所有文件)。

最大的优点是,这会在后台自动发生,并且无论您的数据摄取或转换有多混乱,您都可以简单地写出数据(在写操作时不会遇到麻烦,并尽快将数据发送给消费者)最终仍然是一个很好的分区数据集,只有很少的文件。

这里的主要限制是,这仅适用于Spark可以原生理解的格式的数据集,例如镶木地板,avro,json,csv等。提取任意文件后,解决方法是将这些文件打包到例如吞食前的木地板。这样,铸造厂仍可以随着时间的推移合并这些镶木文件中的多个。

此功能尚未完全提供给最终用户(但计划默认情况下启用所有功能。)如果您认为这是您的其中一条管道最理想的解决方案,那么您的Palantir POC可以开始与团队一起启用此功能。

分区和合并

Coalescing是spark中的一项操作,可以在不具有较大依赖性的情况下减少分区数(spark中唯一的此类操作)。合并速度很快,因为它可以最大程度地减少改组。与以前的spark版本相比,它的确切工作方式已经发生了变化(并且那里有很多冲突的信息),但是通常比repartition快。但是,它有一个很大的警告:它降低了整个转换的并行度

即使您在写数据之前在最后coalesce,spark也会使整个查询计划适应在整个分区中使用更少的分区,从而减少了使用的执行程序,这意味着您减少并行性。

重新分区很相似,但是它会插入一个完整的改组阶段。这会带来更高的性能成本,但是这意味着从此阶段出来的数据基本上可以得到保证(无论输入如何)均经过良好分区。尽管repartition本身有点昂贵,但它不会遇到在整个转换过程中减少并行度的问题。

这意味着,与最终的工作量相比,如果最终写入的数据量不是那么大,总体而言,使用repartitioncoalesce可获得更好的性能在此之上,由于能够在更多执行程序上处理数据的能力最终超过了改组的弊端。根据我的经验,repartition通常会在这里胜出,除非您的转换非常简单。

值得讨论的一个特殊用例是增量管道。如果您的增量管道相对简单明了,例如映射和过滤,然后进行coalesce很好。但是,许多增量管道也会读取非常大的数据集的快照视图。例如,增量管道可能会接收到一行新数据,并读取整个先前的输出数据集(可能是数百万行),因此请查看输出数据集中是否已存在该行。如果已经存在,则不发出任何行;如果不存在,则追加该行。将一小部分增量数据与大型静态数据集等结合在一起时,也会发生类似情况。

在这种情况下,转换是渐进式的,但是它仍然可以从高并行度中受益,因为它仍然可以处理大量数据。

我的大致指导原则是:

  • 将转换作为快照运行:repartition到合理的数量
  • 转换以增量方式运行,不需要高度的并行性:coalesce(1)
  • 转换逐渐运行,但仍然受益于并行性:repartition(1)

如果写速度/流水线延迟非常重要,则这些选项都不可接受。在这种情况下,我会考虑使用背景压缩。

常规快照

作为上一点的扩展,为了保持增量管道的高性能,我希望在其上安排定期的快照,这使我可以不时地对数据集进行一次重新分区,基本上执行“压缩”操作。 / p>

我已经在此处介绍了一种设置方法:How to force an incremental Foundry Transforms job to build non-incrementally without bumping the semantic version?

我通常会安排快照,例如周末。在整个星期中,管道中的每个数据集(可能有数百个数据集)将累积成千上万个事务和文件。然后在周末,随着计划的快照在管道中滚动,每个数据集将重新划分为100个文件。

AQE

最近,AQE在铸造厂可用。本质上(出于本讨论的目的),AQE将coalesce操作注入到阶段中,在这些阶段中,无论如何您都已经在执行洗牌操作,这取决于先前操作的结果。通常,这可以改善分区(从而提高文件数量),但据报道在极少数情况下也会使分区变得更糟(但我本人并未观察到这一点)。

AQE默认情况下处于启用状态,但是如果您想尝试禁用它,可以将其应用于火花转换。

存储和分区

分组和分区在此讨论中有些切线,因为它们主要是关于布局数据以优化读取数据的特定方法。目前,这两种技术均不适用于增量管道。

一个常见的错误是写出由高基数的列(例如时间戳)划分的数据集。在具有一千万个唯一时间戳的数据集中,这将在输出数据集中产生(至少)一千万个文件。

在这些情况下,应固定转换并通过应用保留删除旧事务(包含数百万个文件)。

其他黑客行为

还可以使用其他方法来精简数据集,例如创建“环回”转换以读取先前的输出并对其进行重新分区,或者手动打开数据集上的事务以将其重写。

这些都是非常骇人听闻的,但是我认为这些是不可取的,应该避免。如今,后台压缩主要以一种更加优雅,可靠且不那么笨拙的方式解决了这个问题。