为什么不应该在Python转换中使用collect?

问题描述

TL; DR:我听说有传言说某些PySpark函数在Transforms中不建议使用,但是我不确定哪些函数错误的,为什么会这样?

为什么在某些情况下我不能collect()将数据{{1}}列出并遍历行?

解决方法

这里有很多内容需要弄清才能得出最终结论,即collect()和其他功能是Spark的低效使用。

本地与分布式

首先,让我们介绍一下本地计算与分布式计算之间的区别。在Spark中,通常执行的pyspark.sql.functionspyspark.sql.DataFrame操作(例如join()groupBy())会将这些操作的执行委派给基础Spark库,以实现最大的性能。可以认为这只是将Python用作SQL之上的一种更方便的语言,您在其中懒惰地描述了Spark希望为您完成的操作。

通过这种方式,当您坚持在PySpark中执行SQL操作时,可以期望具有高度可扩展的性能,但仅适用于您可以用SQL表示的内容。这是人们通常可以采用的一种惰性方法并使用for循环来实现其转换,而不用考虑最佳策略。

让我们考虑一下您想简单地将单个值添加到DataFrame中的整数列的情况。您会在Stack Overflow和其他地方找到很多例子,在一些更微妙的情况下,他们建议使用collect()将数据带入Python列表,遍历每一行,然后将数据推回到DataFrame中。完成后,这是您可以在此处执行的一项策略。让我们考虑一下它在实践中的含义:将Spark中托管的数据带回构建驱动程序,以便在Python的每一行中使用单个线程进行循环,并在每一行中添加一个常量值一次。如果改为找到等效于此操作的SQL(在这种情况下很明显),Spark可以获取您的数据,然后大规模并行地将该值添加到各个行中。也就是说,如果您有64位执行者(可以用作工作人员的实例),那么您将有64个“核心”(这不是一个完美的类比,但接近)可以将数据拆分并发送到每个用于将值添加到列。这将使您大大更快地执行所需的最终结果操作。

在驱动程序上工作是我所谓的“本地”计算,而在执行程序中则是“并行”工作。

这可能是一个显而易见的示例,但是在处理更困难的转换(例如高级加窗运算或线性代数计算)时,通常很难记住这种差异。 Spark拥有可用于以分布式方式进行矩阵乘法和操作的库,以及Windows上一些相当高级的操作,这些操作首先需要更多考虑您的问题。

懒惰评估

使用PySpark的最有效方法是分派有关如何一次构建DataFrame的“指令”,以便Spark可以找出实现这些数据的最佳方法。这样,应尽可能避免使用强制执行DataFrame的计算的功能,以便您可以在代码中的某个点进行检查。它们表示Spark正在为满足您的print()语句或其他方法调用而付出额外的努力,而不是努力写出您的数据。

Scala中Java中的Python

Python运行时实际上是在JVM内部执行的,而JVM随后又与用Scala编写的Spark运行时通信。因此,对于每个要在Python中实现数据的collect()调用,Spark必须将数据实现为单个本地可用的DataFrame,然后将其从Scala合成为其等效的Java,最后从JVM可以迭代到Python等效项。这是一个效率极低的过程,无法并行化。 因此,强烈建议避免将数据呈现到Python的操作

避免的功能

那么,您应该避免使用什么功能?

  • 收集
  • 服用
  • 第一
  • 显示

这些方法中的每一个都将强制在DataFrame上执行,并将结果返回给Python运行时以供显示/使用。这意味着Spark将没有机会懒惰地找出最有效的方法来对数据进行计算,而将被迫在执行其他任何执行之前取回请求的数据。