将 Spark DataFrame 转换为 spark.rdd.RDD[(Array[Integer], Array[Integer]) 来计算平均精度

问题描述

我有一个 Spark 数据帧:

DataFrame

我必须使用 Spark 和 Scala 来计算 RankingMetrics 的平均精度。 我猜根据文档,我们必须使用 RDD 而不是 DataFrame。 我尝试了以下方法

var llist = df.select("predicted","actual").rdd.map(x => (x.get(0),x.get(1))).collect()
// It gave Array[(Any,Any)]

var df_rdd =sc.parallelize(llist)
// df_rdd is org.apache.spark.rdd.RDD[(Any,Any)]

val metrics = new RankingMetrics(df_rdd)
// This gave me an error

错误

错误:类型不匹配;
找到:org.apache.spark.rdd.RDD[(Any,Any)]
需要:org.apache.spark.rdd.RDD[(Array[?],Array[?])]
注意:(Any,Any) >: (Array[?],Array[?]),但是类RDD在类型T中是不变的。
您可能希望将 T 定义为 -T。 (SLS 4.5)

我使用的是 Spark 2.4.3 版

如何将此 DataFrame 转换为该格式,以便计算平均精度?谢谢。

解决方法

DataFrame 本质上是一个 RDD,而您的 DataFrame 的类型为 DataFrame[Array[Int],Array[Int]]。因此,根据您报告的类型,您应该能够将其直接传递给 new RankingMetrics(df.rdd())。未测试。

,

因为错误只是告诉您 RankingMetrics 的参数应该是类型

(数组[?],数组[?])

但是当您通过简单地输入 df.rdd 来检查您的 RDD 时,它会显示:

org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

所以基本上你必须将这种 Row 类型的数据转换为 Array 类型。一种方法是像这样映射它:

df.rdd.map(r=>(Array(r(0)),Array(r(1))))

这不是建议的方法。同样在您的用例中,不要从创建数据帧开始,而是创建一个具有所需类型数据的 RDD(在您的情况下(数组 [],数组 []))。此外,要从数据帧创建 RDD,您应该使用:

df.rdd