问题描述
trait SparkBaseAgg[V] {
type K
def getKeys: K
def reduce(other: V): V
}
case class ABC(
customerId: Long,count: Long
) extends SparkBaseAgg[ABC] {
override type K = Long
override def getKeys: K = customerId
override def reduce(other: ABC): ABC = {
// return updated
}
}
我想像这样减少Dataset[ABC];
def reduce[V <: SparkBaseAgg[V]: Encoder: classtag](data: Dataset[V]): Dataset[V] = {
import data.sparkSession.implicits._
data
.groupByKey(_.getKeys)
.reduceGroups((first,second) => first.reduce(second))
.map { case (_,value) => value }
}
火花塞
错误:(20,18) 无法找到 V#K 类型的编码器。隐含的 需要编码器 [V#K] 将 V#K 实例存储在数据集中。原始 类型(Int、String 等)和产品类型(案例类)是 支持导入 spark.implicits._ 支持序列化 其他类型将在未来版本中添加。 .groupByKey(_.getKeys)
我该如何解决?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)