问题描述
我有一个输入 Dataset[Input],我想在其上执行转换函数。 要求是此转换函数可以有多个实现,并且应始终返回一个 Dataset[Layout],其中 Layout 类的 Key 和 Value 参数因每个实现而异。
我尝试创建一个接受类型参数 K 和 V 的特征,该特征具有转换函数,然后是实现此特征的类,如下面的代码所示。
问题是在 Dataset.transform 方法中,编码器无法解析类型参数 K 和 V 的乘积。 应如何正确实施此要求?
import scala.language.existentials
import org.apache.spark.sql.{Dataset,Encoders}
// Given
case class Input(key: String,id: String,name: String,value: String,Metadata: String)
trait ExportLayout[K <: Product,V <: Product] {
case class Layout(Key: K,Value: V)
def transformFn(inputDS: Dataset[Input]): Dataset[Layout]
}
object DefaultLayout {
case class Key(id: String,key: String,name: String)
case class Value(value: String,Metadata: String)
}
case class DefaultLayout() extends ExportLayout[DefaultLayout.Key,DefaultLayout.Value] {
import DefaultLayout._
override def transformFn(inputDS: Dataset[Input]): Dataset[Layout] = {
inputDS.map { row =>
Layout(Key(row.id,row.key,row.name),Value(row.value,row.Metadata))
}(Encoders.product[Layout])
}
}
object AnotherLayout {
case class Key(???)
case class Value(???)
}
case class AnotherLayout() extends ExportLayout[AnotherLayout.Key,AnotherLayout.Value] {
import AnotherLayout._
override def transformFn(inputDS: Dataset[Input]): Dataset[Layout] = {
inputDS.map { row => Layout(Key(???),Value(???)) }(Encoders.product[Layout])
}
}
// Test
val rows = Seq(
Input("111","cn","A","10","a"),Input("222","B","20","b")
)
val ds = spark.createDataset(rows)(Encoders.product[Input])
val layoutDS = ds.transform(DefaultLayout().transformFn)
/* throws error:
type K is not a class
scala.ScalaReflectionException: type K is not a class
at scala.reflect.api.Symbols$SymbolApi$class.asClass(Symbols.scala:275)
at scala.reflect.internal.Symbols$SymbolContextApiImpl.asClass(Symbols.scala:84)
at org.apache.spark.sql.catalyst.ScalaReflection$.getClassFromType(ScalaReflection.scala:707)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:91)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:72)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:71)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:639)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritysqlImplicits$class.newProductEncoder(sqlImplicits.scala:248)
at org.apache.spark.sql.sqlImplicits.newProductEncoder(sqlImplicits.scala:34)
at DefaultLayout.transformFn(ExportLayout.scala:_)
*/
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)