问题描述
我正在尝试将项目从Spark 1.6迁移到Spark 2.4.5,但发现有关此主题的一些问题。 我正在使用Scala 2.11.12和Scala Maven插件4.4.0作为编译器插件。
实现:
class AddServiceDaoImpl(spark: SparkSession,configuration: Configuration) extends AddServiceDao {
import spark.implicits._
implicit val sqlc = spark.sqlContext
val generalOptions = Map of stuff...
override def findByXXX(key: RDD[(String,String)]): RDD[AddService] = {
if (key.isEmpty) return spark.sparkContext.emptyRDD[AddService]
val sql = "this is a String"
val df = sqlc
.read
.format("phoenix")
.options(generalOptions)
.option("query",sql)
.load()
val keysDf = key.toDF(Seq("column1","column2"): _*)
keysDf.join(df,Seq("column1","column2")).as[AddService].rdd
}
}
特质:
trait AddServiceDao extends Dao[AddService] {
def findByXXX(key: RDD[(String,String)]): RDD[AddService]
}
还有一个案例类:
case class AddService( firstField: String,secondField: String,and so on for 10 fields..)
现在再添加几个元素:
我想念什么?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)