尝试在 SPARK (Scala)

问题描述

我正在学习 Spark 中的基本教程,其中要求我们将 .csv 文件导入数据库,执行查询并收集该查询的结果。

为此,我们首先创建一个 Persona 案例类来包含数据,以及一个将数据集的每一行格式化为 Persona 对象的函数,如下所示:

case class Persona(ID:Int,nombre:String,edad:Int,numeroAmigos:Int)

def procesarPersona(linea:String):Persona = {
    val campos = linea.split(",")
    val persona = Persona(campos(0).toInt,campos(1),campos(2).toInt,campos(3).toInt)
    return persona
}

然后,我们创建 spark sql 会话并从 .csv 文件中导入数据,如下所示:

val spark = SparkSession.builder.appName("Personas").getorCreate()

val lineas = spark.sparkContext.textFile("file:///home/Eduardo/Downloads/friends.csv")

然后我们用函数映射每一行来处理每一行:

val personas = lineas.map(procesarPersona) //DATAFRAME RESULTADO

接下来,我们使用以下命令将 Dataframe 转换为 sql 数据库


val estructuraDatos = personas.toDS

estructuraDatos.printSchema

estructuraDatos.createOrReplaceTempView("personas")

然后我执行 sql 查询,并尝试收集数据,

val mayoresEdad = spark.sql("SELECT * FROM personas WHERE edad >= 18")

val resultados = mayoresEdad.collect()

问题来了,前面的所有步骤都与视频中显示的结果相匹配,但是在执行查询后,我无法收集结果而不会出现以下错误

我得到的错误如下:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 Failed 1 times,most recent failure: Lost task 0.0 in stage 4.0 (TID 8,192.168.1.43,executor driver): java.lang.classCastException: class Persona cannot be cast to class Persona (Persona is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @5337f0dc; Persona is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @2229c7c)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(UnkNown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我已经检查了我的 spark 版本,我是对的,我可能与我的教授在 mac 上使用以前的 scala 版本无关,但我怀疑这就是原因。

错误可能是什么有任何想法吗?

我使用的是 Spark 3.0.2、ubuntu 20.04 和 Zeppelin 0.8.9-bin-all。

感谢您的帮助!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)