Spark - Scala 泛型类型,编码器错误

问题描述

trait SparkBaseAgg[V] {
  type K

  def getKeys: K
  def reduce(other: V): V
}

case class ABC(
    customerId: Long,count: Long
) extends SparkBaseAgg[ABC] {

  override type K = Long

  override def getKeys: K = customerId

  override def reduce(other: ABC): ABC = {
    // return updated
  }
}

我想像这样减少Dataset[ABC];

def reduce[V <: SparkBaseAgg[V]: Encoder: classtag](data: Dataset[V]): Dataset[V] = {
    import data.sparkSession.implicits._

    data
      .groupByKey(_.getKeys)
      .reduceGroups((first,second) => first.reduce(second))
      .map { case (_,value) => value }
  }

火花塞

错误:(20,18) 无法找到 V#K 类型的编码器。隐含的 需要编码器 [V#K] 将 V#K 实例存储在数据集中。原始 类型(Int、String 等)和产品类型(案例类)是 支持导入 spark.implicits._ 支持序列化 其他类型将在未来版本中添加。 .groupByKey(_.getKeys)

我该如何解决

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)