如何使用Scala将DataSet传递给在Apache Spark中接受DataFrame作为参数的函数?

问题描述

我在Scala中有一个用于Spark的库,其中包含许多功能一个示例是以下函数,用于合并具有不同列的两个数据框:

def appendDF(df2: DataFrame): DataFrame = {

  val cols1 = df.columns.toSeq
  val cols2 = df2.columns.toSeq

  def expr(sourceCols: Seq[String],targetCols: Seq[String]): Seq[Column] = {
    targetCols.map({
      case x if sourceCols.contains(x) => col(x)
      case y                           => lit(null).as(y)
    })
  }

  // both df's need to pass through `expr` to guarantee the same order,as needed for correct unions.
  df.select(expr(cols1,cols1): _*).union(df2.select(expr(cols2,cols1): _*))

}

我想将此功能(以及更多功能)用于Dataset[CleanRow],而不是DataFrames。 CleanRow一个简单的类,用于定义列的名称和类型。 我有根据的猜测是使用.toDF()方法将数据集转换为数据帧。但是,我想知道是否还有更好的方法

据我了解,Dataset和Dataframe之间应该没有太多区别,因为Dataset只是Dataframe [Row]。另外,我认为从Spark 2.x起,用于DF和DS的API已统一,因此我认为我可以互换使用它们,但这不是事实。

解决方法

如果可以更改签名:

import spark.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset

def f[T](d: Dataset[T]): Dataset[T] = {d}

// You are able to pass a dataframe:
f(Seq(0,1).toDF()).show
// res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: int]

// You are also able to pass a dataset:
f(spark.createDataset(Seq(0,1)))
// res2: org.apache.spark.sql.Dataset[Int] = [value: int]

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...