尝试同时使用udf和to_json时获取“任务无法序列化:java.io.NotSerializableException”

问题描述

我一直试图找出确切的问题出在哪里,但无法做到。尝试也遵循类似UDF to generate JSON string behaving inconsistently方法,但仍然无法理解问题。

下面是我的代码段,

    val writingDataset = sparkSession
      .readStream
      .format("kafka")
      .option(kafkaBootstrapServers,urls)
      .option("subscribe",inputTopics)
      .option("startingOffsets","earliest")
      .load()
      .selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
//      .withColumn("value",parser.parseUDF('value).as("value")) //combination of this two line doesn't work either
//      .withColumn("value",to_json('value).as("value")) //combination of this two line doesn't work either
      .select(col("key"),to_json(parser.parseUDF('value)).as("value"))
      .writeStream
        .format("console")
        .start()

    writingDataset.awaitTermination

下面是我的udf代码

    val parse = (value: String) => {
    Some(CompanyDetail("something","something"))
  }

  import org.apache.spark.sql.functions.udf
  val parseUDF = udf(parse)
  val keyUDF = udf(keyParse)

不确定这里发生了什么,但我不断收到以下错误消息

org.apache.spark.SparkException: Writing job aborted.
    at org.apache.spark.sql.execution.datasources.v2.WritetoDataSourceV2Exec.doExecute(WritetoDataSourceV2Exec.scala:92)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$7251/0000000000000000.apply(UnkNown Source)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$7280/0000000000000000.apply(UnkNown Source)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2782)
    at org.apache.spark.sql.Dataset$$Lambda$7166/000000006C38DB10.apply(UnkNown Source)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset$$Lambda$7169/000000006C38F210.apply(UnkNown Source)
    at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withNewExecutionId$1(sqlExecution.scala:78)
    at org.apache.spark.sql.execution.sqlExecution$$$Lambda$7140/000000006C25F080.apply(UnkNown Source)
    at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:125)
    at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:540)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$7136/000000006C25E1B0.apply(UnkNown Source)
    at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withNewExecutionId$1(sqlExecution.scala:78)
    at org.apache.spark.sql.execution.sqlExecution$$$Lambda$7140/000000006C25F080.apply(UnkNown Source)
    at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:125)
    at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:536)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$7135/000000006C25DA80.apply(UnkNown Source)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken$(ProgressReporter.scala:349)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTiMetaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:535)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$6895/000000006C02DE80.apply$mcV$sp(UnkNown Source)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken$(ProgressReporter.scala:349)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTiMetaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$6893/000000006C02CF10.apply$mcZ$sp(UnkNown Source)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.runtime.LazyRef

解决方法

我自己弄清楚了。火花代码没有任何问题。 Scala版本有问题。一旦我将scala version降级为2.11.0

,它就起作用了