问题描述
我使用的是 Spark 2.4.7。我希望它在读取 Parquet 文件时修剪一些列。但我没有设法强调这种行为。
这是我的测试代码:
import org.apache.spark.sql.{SaveMode,SparkSession}
import scala.io.StdIn
object Main {
def main(args: Array[String]): Unit = {
val path = "/tmp/spark-test"
val spark = SparkSession.builder().master("local[2]").getorCreate()
import spark.implicits._
// Generate and save some data
(0 to 1000).map(i =>
Full(Some(i),Some(i.formatted("%d")),Some((0 to i).map(j => j)))
).toDS().write.mode(SaveMode.Overwrite).format("parquet").save(path)
// Read the full dataset (as comparison baseline)
println("Reading full data")
val fullDs = spark.read.schema(spark.emptyDataset[Full].schema).parquet(path)
println(fullDs.count())
fullDs.explain(true)
// Try to read only one column
println("Reading partial data")
val partialDs = spark.read.schema(spark.emptyDataset[PartialInt].schema).parquet(path)
println(partialDs.count())
partialDs.explain(true)
// Leave the session alive to allow to browse to Spark UI
StdIn.readLine()
}
/** Possible fields */
trait Mixin {
val intF: Option[Int]
val strF: Option[String]
val arrayF: Option[Seq[Int]]
}
/** Canonical representation,implementing all fields. */
case class Full(intF: Option[Int],strF: Option[String],arrayF: Option[Seq[Int]]) extends Mixin
/** Partial representation,avoiding loading unnecessary columns */
case class PartialInt(intF: Option[Int]) extends Mixin {
override val strF: Option[String] = ???
override val arrayF: Option[Seq[Int]] = ???
}
}
在 spark UI 中,两个阅读阶段似乎具有相同的输入大小,并且我在输出的计划中看不到任何明确的提示。有没有办法明确突出列修剪机制?
亲切, 亚历克西斯。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)