问题描述
我正在学习 Spark 中的基本教程,其中要求我们将 .csv 文件导入数据库,执行查询并收集该查询的结果。
为此,我们首先创建一个 Persona 案例类来包含数据,以及一个将数据集的每一行格式化为 Persona 对象的函数,如下所示:
case class Persona(ID:Int,nombre:String,edad:Int,numeroAmigos:Int)
def procesarPersona(linea:String):Persona = {
val campos = linea.split(",")
val persona = Persona(campos(0).toInt,campos(1),campos(2).toInt,campos(3).toInt)
return persona
}
然后,我们创建 spark sql 会话并从 .csv 文件中导入数据,如下所示:
val spark = SparkSession.builder.appName("Personas").getorCreate()
val lineas = spark.sparkContext.textFile("file:///home/Eduardo/Downloads/friends.csv")
然后我们用函数映射每一行来处理每一行:
val personas = lineas.map(procesarPersona) //DATAFRAME RESULTADO
接下来,我们使用以下命令将 Dataframe 转换为 sql 数据库:
val estructuraDatos = personas.toDS
estructuraDatos.printSchema
estructuraDatos.createOrReplaceTempView("personas")
val mayoresEdad = spark.sql("SELECT * FROM personas WHERE edad >= 18")
val resultados = mayoresEdad.collect()
问题来了,前面的所有步骤都与视频中显示的结果相匹配,但是在执行查询后,我无法收集结果而不会出现以下错误。
我得到的错误如下:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 Failed 1 times,most recent failure: Lost task 0.0 in stage 4.0 (TID 8,192.168.1.43,executor driver): java.lang.classCastException: class Persona cannot be cast to class Persona (Persona is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @5337f0dc; Persona is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @2229c7c)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(UnkNown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
我已经检查了我的 spark 版本,我是对的,我可能与我的教授在 mac 上使用以前的 scala 版本无关,但我怀疑这就是原因。
对错误可能是什么有任何想法吗?
我使用的是 Spark 3.0.2、ubuntu 20.04 和 Zeppelin 0.8.9-bin-all。
感谢您的帮助!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)