scala – 可序列化对象的用法:引起:java.io.NotSerializableException

我关注任务序列化的 this tutorial和其他类似的教程,但是我的代码因任务序列化错误而失败.我不明白为什么会这样.我在foreachRDD之外设置变量topicOutputMessages,然后我在foreachPartition中读取它.我也创建了KafkaProducer INSIDE foreachPartition.那么,这里的问题是什么?真的不能说明问题.

al topicsSet = topicInputMessages.split(",").toSet
    val kafkaParams = Map[String,String]("metadata.broker.list" -> metadataBrokerList_InputQueue)
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet).map(_._2)


messages.foreachRDD(rdd => {
    rdd.foreachPartition{iter =>
        UtilsDM.setMetadataBrokerList(metadataBrokerList)
        UtilsDM.setOutputTopic(topicOutputMessages)
        val producer = UtilsDM.createProducer
        iter.foreach { msg =>
              val record = new ProducerRecord[String,String](UtilsDM.getOutputTopic(),msg)
              producer.send(record)
        }
        producer.close()
    }
})

编辑:

object UtilsDM extends Serializable {

  var topicOutputMessages: String = ""
  var metadataBrokerList: String = ""
  var producer: KafkaProducer[String,String] = null

  def setOutputTopic(t: String): Unit = {
    topicOutputMessages = t
  }

  def setMetadataBrokerList(m: String): Unit = {
    metadataBrokerList = m
  }

 def createProducer: KafkaProducer[String,String] = {

    val  kafkaProps = new Properties()

    kafkaProps.put("bootstrap.servers",metadataBrokerList)

    // This is mandatory,even though we don't send key
    kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("acks","1")

    // how many times to retry when produce request fails?
    kafkaProps.put("retries","3")
    // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes)
    kafkaProps.put("batch.size","5")
    // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch
    kafkaProps.put("linger.ms","5")

    new KafkaProducer[String,String](kafkaProps)
  }

}

完整的堆栈跟踪:

16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer,value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1,name: $outer,type: class org.test.consumer.kafka.KafkaDecisionsConsumer)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1,<function1>)
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1,type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1)
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1,<function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more
16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103)
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer
Serialization stack:
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer,<function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 30 more

解决方法

序列化问题在于Spark如何处理闭包序列化(您可以在此答案中详细阅读: How spark handles object)

在失败的代码中,在这里引用metadataBrokerList和topicOutputMessages:

rdd.foreachPartition{iter =>
    UtilsDM.setMetadataBrokerList(metadataBrokerList)
    UtilsDM.setOutputTopic(topicOutputMessages)

创建对创建这些变量的外部对象的引用,并强制Spark中的闭包清理器包含在“已清理”的闭包中.外部然后在闭包中包含sparkContext和streamingContext,它们不可序列化,因此也就是序列化异常.

在第二次尝试中(在作为答案发布的变通方法中),这些链接被破坏,因为变量现在包含在帮助对象中,并且闭包可以从外部上下文“切除”.

我认为在UtilsDM对象中添加@transient对于这些变量是不必要的,因为这些值是可序列化的.请注意,在每个执行程序中重新创建单例对象.因此,驱动程序中更改的可变变量的值在执行程序中将不可用,如果处理不当,通常会导致NullPointerException.

有一个序列化技巧可以帮助原始场景:

复制闭包内的引用变量.例如

rdd.foreachPartition{iter =>
    val innerMDBL  = metadataBrokerList
    val innerTOM = topicOutputMessages
    UtilsDM.setMetadataBrokerList(innerMDBL)
    UtilsDM.setOutputTopic(innerTOM)

这样,值在编译时被复制,并且也没有与外部的链接.

为了处理执行程序绑定对象(如非序列化连接甚至本地缓存),我更喜欢使用实例工厂方法,如本答案中所述:Redis on Spark:Task not serializable

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...