scala spark rdd 错误:无法将 java.lang.invoke.SerializedLambda 的实例分配给字段 org.apache.spark.rdd.MapPartitionsRDD ]

问题描述

火花版本:3.0.1 Scala 版本:2.12.10

由于我不是生活在英语世界,如果语法不正确,请谅解。

我是 Scala 和 Spark 的初学者。

我想绘制数据分布的直方图,如下图所示。

enter image description here

所以,我想通过将数据分成 10 个部分并计算开始范围、结束范围和数量来创建一个数据集。

但是,下面的代码出现错误,无法处理该操作。

(1) SparkSession 连接代码

object SparkResource {

    
    implicit val oSparkOrigin = SparkSession.builder()
        .master("spark://centos-master:7077")
        // .master("local[*]")
        .appName("spark-api")
        .getorCreate()

    implicit val oSpark = SparkSession.active
}

(2) 使用RDD函数代码(出现错误代码

  def handleDataFromHDFS(
      pSpark: SparkSession,pFilename: String,pFiletype: String,pFileseq: String,pUserno: String,pGroupId: String,pJobId: String)(implicit mc: MarkerContext): Future[ResponseResource] = {

    require(pFiletype != null,"filetype must be not null.")
    require(pFileseq != null,"pFileseq must be not null.")

    apiService.serviceDataFromHDFS(pSpark,pFilename,pFiletype,pFileseq,pUserno).map { sData =>


      // pSpark == SparkResource.oSpark (SparkSession)
      // sData == Dataset[Row]

      // table name
      val sName = pUserno + "_" + pGroupId + "_" + pJobId
      // Temp View Create
      sData.createOrReplaceTempView(sName)
      // cache
      pSpark.sqlContext.cacheTable(sName) 


      sData.show()

      // +----------+------+----------+----------+----------+----------+---------+
      // |      date|symbol|      open|     close|       low|      high|   volume|
      // +----------+------+----------+----------+----------+----------+---------+
      // |2016-01-05|  WLTW|    123.43|125.839996|122.309998|    126.25|2163600.0|
      // |2016-01-06|  WLTW|125.239998|119.980003|119.940002|125.540001|2386400.0|
      // |2016-01-07|  WLTW|116.379997|114.949997|    114.93|119.739998|2489500.0|
      // |2016-01-08|  WLTW|115.480003|116.620003|     113.5|117.440002|2006300.0|
      // |2016-01-11|  WLTW|117.010002|114.970001|114.089996|117.330002|1408600.0|
      // |2016-01-12|  WLTW|115.510002|115.550003|     114.5|116.059998|1098000.0|
      // |2016-01-13|  WLTW|116.459999|112.849998|112.589996|    117.07| 949600.0|
      // |2016-01-14|  WLTW|113.510002|114.379997|110.050003|115.029999| 785300.0|
      // |2016-01-15|  WLTW|113.330002|112.529999|111.919998|114.879997|1093700.0|
      // |2016-01-19|  WLTW|113.660004|110.379997|109.870003|115.870003|1523500.0|
      // |2016-01-20|  WLTW|109.059998|109.300003|    108.32|111.599998|1653900.0|
      // |2016-01-21|  WLTW|109.730003|     110.0|    108.32|110.580002| 944300.0|
      // |2016-01-22|  WLTW|111.879997|111.949997|110.190002|112.949997| 744900.0|
      // |2016-01-25|  WLTW|    111.32|110.120003|     110.0|114.629997| 703800.0|
      // |2016-01-26|  WLTW|110.419998|     111.0|107.300003|111.400002| 563100.0|
      // |2016-01-27|  WLTW|110.769997|110.709999|109.019997|    112.57| 896100.0|
      // |2016-01-28|  WLTW|110.900002|112.580002|109.900002|112.970001| 680400.0|
      // |2016-01-29|  WLTW|113.349998|114.470001|111.669998|114.589996| 749900.0|
      // |2016-02-01|  WLTW|     114.0|     114.5|112.900002|114.849998| 574200.0|
      // |2016-02-02|  WLTW|    113.25|110.559998|    109.75|113.860001| 694800.0|
      // +----------+------+----------+----------+----------+----------+---------+

      import org.apache.spark.sql.functions.{col,column,expr}
      import pSpark.implicits._


      var sMinMax_df = sData.agg(max($"open"),min($"open")).head()
      var sMaxValue = sMinMax_df(0).toString
      var sMinValue = sMinMax_df(1).toString

      println("=sMaxValue=")
      println(sMaxValue)
      println("=sMinValue=")
      println(sMinValue)
      

      val thresholds: Array[Double] = (((sMinValue.todouble until sMaxValue.todouble by (sMaxValue.todouble - sMinValue.todouble)/10).toArray ++ Array(sMaxValue.todouble)).map(_.todouble))
      thresholds.foreach(x => println(x))

      // 1.66
      // 159.9379941
      // 318.2159882
      // 476.4939823
      // 634.7719764
      // 793.0499705
      // 951.3279646
      // 1109.6059587000002
      // 1267.8839528
      // 1426.1619469
      // 1584.439941


      // // Convert DataFrame to RDD and calculate histogram values
      // error-occurring code here
      val _tmpHist = sData
          .select($"open" cast "double")
          .rdd.map(r => r.getDouble(0))
          .histogram(thresholds)

    


      // // Result DataFrame contains `from`,`to` range and the `value`.
      val histogram = pSpark.sparkContext.parallelize((thresholds,thresholds.tail,_tmpHist).zipped.toList).toDF("from","to","value")

      // histogram.show()
      // +------------------+------------------+------+
      // |              from|                to| value|
      // +------------------+------------------+------+
      // |              1.66|       159.9379941|811608|
      // |       159.9379941|       318.2159882| 28881|
      // |       318.2159882|       476.4939823|  4959|
      // |       476.4939823|       634.7719764|  2883|
      // |       634.7719764|       793.0499705|  1834|
      // |       793.0499705|       951.3279646|   257|
      // |       951.3279646|1109.6059587000002|   120|
      // |1109.6059587000002|      1267.8839528|   396|
      // |      1267.8839528|      1426.1619469|   237|
      // |      1426.1619469|       1584.439941|    89|
      // +------------------+------------------+------+

      ResponseResource("select",Json.toJson(sData.limit(20).toJSON.collect()),Json.parse(sData.schema.json)("fields"),sName,sData.count(),0)
    }
  }

(3) sbt


name := """spark-api-test"""
organization := "com.baeldung"

version := "1.0-SNAPSHOT"

lazy val root = (project in file(".")).enablePlugins(PlayScala)

scalaVersion := "2.12.10"

resolvers += "jitpack" at "https://jitpack.io"


libraryDependencies += guice


libraryDependencies += "org.scalatestplus.play" %% "scalatestplus-play" % "5.0.0" % Test
libraryDependencies += "MysqL" % "mysql-connector-java" % "5.1.41"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.0.1"
libraryDependencies +=     "org.joda" % "joda-convert" % "2.2.1"
libraryDependencies +=     "net.logstash.logback" % "logstash-logback-encoder" % "6.2"
libraryDependencies +=     "io.lemonlabs" %% "scala-uri" % "1.5.1"
libraryDependencies +=     "net.codingwell" %% "scala-guice" % "4.2.6"
libraryDependencies +=  "com.crealytics" %% "spark-excel" % "0.13.6"
libraryDependencies += "com.github.shin285" % "KOMORAN" % "3.3.4"

// hdfs save lib
libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "1.0.0"




// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.10"


addCompilerPlugin("com.github.aoiroaoino" %% "totuple" % "0.1.2")


enablePlugins(JavaAppPackaging)

(4) 出现错误代码:在错误添加.histogram。

val _tmpHist = sData
          .select($"open" cast "double")
          .rdd.map(r => r.getDouble(0))
          .histogram(thresholds)

(5) 异常

Job aborted due to stage failure: Task 0 in stage 5.0 Failed 4 times,most recent failure: Lost task 0.3 in stage 5.0 (TID 12,192.168.0.220,executor 0): java.lang.classCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setobjFieldValues(ObjectStreamClass.java:2301)
    at java.io.ObjectStreamClass.setobjFieldValues(ObjectStreamClass.java:1431)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2410)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.readobject(ObjectInputStream.java:502)
    at java.io.ObjectInputStream.readobject(ObjectInputStream.java:460)
    at scala.collection.immutable.List$SerializationProxy.readobject(List.scala:488)
    at sun.reflect.GeneratedMethodAccessor9.invoke(UnkNown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadobject(ObjectStreamClass.java:1184)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2295)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
    at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.readobject(ObjectInputStream.java:502)
    at java.io.ObjectInputStream.readobject(ObjectInputStream.java:460)
    at org.apache.spark.serializer.JavaDeserializationStream.readobject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:

如果sparksession中master设置为local[*],则正常工作。

implicit val oSparkOrigin = SparkSession.builder()
        .master("local[*]")
        .appName("spark-api")
        .getorCreate()

但我必须使用 Spark 主 URL。

为此,我已经挣扎了好几天。请告诉我一个解决方案。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)