如何从特征中定义的泛型类型参数编码数据集类型 -- scala.ScalaReflectionException: type K is not a class

问题描述

我有一个输入 Dataset[Input],我想在其上执行转换函数。 要求是此转换函数可以有多个实现,并且应始终返回一个 Dataset[Layout],其中 Layout 类的 Key 和 Value 参数因每个实现而异。

我尝试创建一个接受类型参数 K 和 V 的特征,该特征具有转换函数,然后是实现此特征的类,如下面的代码所示。

问题是在 Dataset.transform 方法中,编码器无法解析类型参数 K 和 V 的乘积。 应如何正确实施此要求?

import scala.language.existentials
import org.apache.spark.sql.{Dataset,Encoders}

// Given
case class Input(key: String,id: String,name: String,value: String,Metadata: String)

trait ExportLayout[K <: Product,V <: Product] {
  case class Layout(Key: K,Value: V)
  def transformFn(inputDS: Dataset[Input]): Dataset[Layout]
}

object DefaultLayout {
  case class Key(id: String,key: String,name: String)
  case class Value(value: String,Metadata: String)
}
case class DefaultLayout() extends ExportLayout[DefaultLayout.Key,DefaultLayout.Value] {
  import DefaultLayout._
  override def transformFn(inputDS: Dataset[Input]): Dataset[Layout] = {
    inputDS.map { row =>
      Layout(Key(row.id,row.key,row.name),Value(row.value,row.Metadata))
    }(Encoders.product[Layout])
  }
}

object AnotherLayout {
  case class Key(???)
  case class Value(???)
}
case class AnotherLayout() extends ExportLayout[AnotherLayout.Key,AnotherLayout.Value] {
  import AnotherLayout._
  override def transformFn(inputDS: Dataset[Input]): Dataset[Layout] = {
    inputDS.map { row => Layout(Key(???),Value(???)) }(Encoders.product[Layout])
  }
}

// Test
val rows = Seq(
  Input("111","cn","A","10","a"),Input("222","B","20","b")
)
val ds = spark.createDataset(rows)(Encoders.product[Input])
val layoutDS = ds.transform(DefaultLayout().transformFn)

/* throws error:
type K is not a class
scala.ScalaReflectionException: type K is not a class
    at scala.reflect.api.Symbols$SymbolApi$class.asClass(Symbols.scala:275)
    at scala.reflect.internal.Symbols$SymbolContextApiImpl.asClass(Symbols.scala:84)
    at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:707)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:91)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:72)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:71)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:639)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.LowPrioritysqlImplicits$class.newProductEncoder(sqlImplicits.scala:248)
    at org.apache.spark.sql.sqlImplicits.newProductEncoder(sqlImplicits.scala:34)
    at DefaultLayout.transformFn(ExportLayout.scala:_)
*/

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)