我关注任务序列化的
this tutorial和其他类似的教程,但是我的代码因任务序列化错误而失败.我不明白为什么会这样.我在foreachRDD之外设置变量topicOutputMessages,然后我在foreachPartition中读取它.我也创建了KafkaProducer INSIDE foreachPartition.那么,这里的问题是什么?真的不能说明问题.
al topicsSet = topicInputMessages.split(",").toSet val kafkaParams = Map[String,String]("metadata.broker.list" -> metadataBrokerList_InputQueue) val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicsSet).map(_._2) messages.foreachRDD(rdd => { rdd.foreachPartition{iter => UtilsDM.setMetadataBrokerList(metadataBrokerList) UtilsDM.setOutputTopic(topicOutputMessages) val producer = UtilsDM.createProducer iter.foreach { msg => val record = new ProducerRecord[String,String](UtilsDM.getOutputTopic(),msg) producer.send(record) } producer.close() } })
编辑:
object UtilsDM extends Serializable { var topicOutputMessages: String = "" var metadataBrokerList: String = "" var producer: KafkaProducer[String,String] = null def setOutputTopic(t: String): Unit = { topicOutputMessages = t } def setMetadataBrokerList(m: String): Unit = { metadataBrokerList = m } def createProducer: KafkaProducer[String,String] = { val kafkaProps = new Properties() kafkaProps.put("bootstrap.servers",metadataBrokerList) // This is mandatory,even though we don't send key kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("acks","1") // how many times to retry when produce request fails? kafkaProps.put("retries","3") // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes) kafkaProps.put("batch.size","5") // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch kafkaProps.put("linger.ms","5") new KafkaProducer[String,String](kafkaProps) } }
完整的堆栈跟踪:
16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer,value: org.test.consumer.kafka.KafkaDecisionsConsumer@4a0ee025) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1,name: $outer,type: class org.test.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1,<function1>) - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1,type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1,<function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more 16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103) at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer,<function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more
解决方法
序列化问题在于Spark如何处理闭包序列化(您可以在此答案中详细阅读:
How spark handles object)
在失败的代码中,在这里引用metadataBrokerList和topicOutputMessages:
rdd.foreachPartition{iter => UtilsDM.setMetadataBrokerList(metadataBrokerList) UtilsDM.setOutputTopic(topicOutputMessages)
创建对创建这些变量的外部对象的引用,并强制Spark中的闭包清理器包含在“已清理”的闭包中.外部然后在闭包中包含sparkContext和streamingContext,它们不可序列化,因此也就是序列化异常.
在第二次尝试中(在作为答案发布的变通方法中),这些链接被破坏,因为变量现在包含在帮助对象中,并且闭包可以从外部上下文“切除”.
我认为在UtilsDM对象中添加@transient对于这些变量是不必要的,因为这些值是可序列化的.请注意,在每个执行程序中重新创建单例对象.因此,驱动程序中更改的可变变量的值在执行程序中将不可用,如果处理不当,通常会导致NullPointerException.
有一个序列化技巧可以帮助原始场景:
复制闭包内的引用变量.例如
rdd.foreachPartition{iter => val innerMDBL = metadataBrokerList val innerTOM = topicOutputMessages UtilsDM.setMetadataBrokerList(innerMDBL) UtilsDM.setOutputTopic(innerTOM)
这样,值在编译时被复制,并且也没有与外部的链接.
为了处理执行程序绑定对象(如非序列化连接甚至本地缓存),我更喜欢使用实例工厂方法,如本答案中所述:Redis on Spark:Task not serializable