scala – 在Spark中将镶木地板文件加载到case类中的性能

我正在评估在Spark中加载Parquet文件的不同方法的性能,差异是惊人的.

在我们的Parquet文件中,我们有类似的嵌套case类:

case class C(/* a dozen of attributes*/)
case class B(/* a dozen of attributes*/,cs: Seq[C])
case class A(/* a dozen of attributes*/,bs: Seq[B])

从Parquet文件加载它们需要一段时间.
所以我已经完成了从Parquet文件加载案例类的不同方法的基准,并使用Spark 1.6和2.0对一个字段求​​和.

以下是我做的基准测试的总结:

val df: DataFrame = sqlContext.read.parquet("path/to/file.gz.parquet").persist()
df.count()

// Spark 1.6

// Play Json
// 63.169s
df.toJSON.flatMap(s => Try(Json.parse(s).as[A]).toOption)
         .map(_.fieldToSum).sum()

// Direct access to field using Spark Row
// 2.811s
df.map(row => row.getAs[Long]("fieldToSum")).sum()

// Some small library we developed that access fields using Spark Row
// 10.401s
df.toRDD[A].map(_.fieldToSum).sum()

// Dataframe hybrid SQL API
// 0.239s
df.agg(sum("fieldToSum")).collect().head.getAs[Long](0)

// Dataset with RDD-style code
// 34.223s
df.as[A].map(_.fieldToSum).reduce(_ + _)

// Dataset with column selection
// 0.176s
df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _)


// Spark 2.0

// Performance is similar except for:

// Direct access to field using Spark Row
// 23.168s
df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)

// Some small library we developed that access fields using Spark Row
// 32.898s
f1DF.toRDD[A].map(_.fieldToSum).sum()

我理解为什么升级到Spark 2.0时使用Spark Row的方法的性能会降低,因为Dataframe现在只是Dataset [Row]的别名.
我想这就是统一接口的成本.

另一方面,我对数据集的承诺没有得到保留感到非常失望:使用RDD样式编码(map和flatMaps)时的性能比使用类似SQL的DSL的数据集这样的数据集更糟.

基本上,为了获得良好的性能,我们需要放弃类型安全.

>用作RDD的数据集和用作数据帧的数据集之间存在这种差异的原因是什么?
>有没有办法提高数据集中的编码性能,以等同RDD风格的编码和SQL风格的编码性能?对于数据工程,使用RDD样式编码要清晰得多.
>此外,使用类似SQL的DSL需要展平我们的数据模型,而不是使用嵌套的case类.我是对的,只有平面数据模型才能实现良好的性能?

解决方法

What is the reason for such difference between Dataset used as RDD and Dataset used as Dataframe?

为了获得一些见解,让我们考虑一下Spark SQL使用的优化.据我所知,有三种类型的改进优于普通RDD:

>执行计划优化(投影和选择下推,常数折叠),
>本机(堆外)内存使用和高效的列式缓存格式,
>代码生成.

现在问题是并非所有这些技术在受限制的编程模型(如SQL)之外都是有用的.

例如,可以下推选择(过滤器),但投影非常有限(你不能真正拥有对象的一部分,可以吗?).类似地,代码生成依赖于定义良好的语义,并且通常不容易应用它(它基本上是一个生成可以通过JVM进一步优化的代码的编译器).

最后sun.misc.Unsafe是一种提高性能的惊人方式,但它不是免费提供的.虽然这里有很多增益,但编码和解码也有很大的开销.

working with the SQL-like DSL would require to flatten our data model and not use nested case classes.

嵌套结构并不完全是第一类公民,并且有一些记录不完的限制,你仍然可以在这里做很多.

performance of methods using Spark Row is degraded when upgrading to Spark
2.0,since Dataframe is now a mere alias of Dataset[Row]. That’s the cost of unifying the interfaces,I guess.

虽然有一些性能回归,但这两段代码根本不相同.在2.0.0中,DataFrame.map具有与1.x版本不同的签名.如果你想使这两者相当,你应该首先转换为RDD:

df.rdd.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...