spark无法序列化lambda

问题描述

  • 我的程序员起初运行正常。
  • 当我添加spark BloomFilter时,我的程序员在执行几次批处理后失败,并Could not serialize lambda失败。
  • 我尝试注册Kryo但也失败了,我该怎么办,谢谢
conf.registerKryoClasses(Array(
      Class.forName("org.apache.spark.util.sketch.BloomFilterImpl"),Class.forName("org.apache.spark.util.sketch.BloomFilter"),Class.forName("org.apache.spark.util.sketch.BitArray"),Class.forName("com.jk.utils.KafkaSink"),Class.forName("com.jk.utils.KafkaSink$$anonfun$1"),Class.forName("org.apache.kafka.clients.producer.KafkaProducer"),Class.forName("org.apache.kafka.clients.producer.internals.RecordAccumulator"),Class.forName("org.apache.kafka.clients.ApiVersions"),Class.forName("org.json4s.JsonAST$JObject"),Class.forName("org.json4s.JsonAST$JArray"),Class.forName("org.json4s.JsonAST$JString"),Class.forName("org.json4s.JsonAST$JInt"),Class.forName("org.json4s.JsonAST$JBool"),Class.forName("org.json4s.JsonAST$JLong"),Class.forName("scala.math.BigInt"),Class.forName("java.math.BigInteger"),Class.forName("org.apache.kafka.common.metrics.KafkaMetric"),Class.forName("org.apache.kafka.common.metrics.Metrics"),Class.forName("org.apache.kafka.clients.producer.internals.BufferPool")
    ))

主要代码

import org.apache.spark.util.sketch.BloomFilter

var bf: broadcast[BloomFilter] = sc.broadcast(BloomFilter.create(10000000L))

kafkaDstream.foreachRDD(record => {
   val rdd = record.filter(msg=>{
       // filter with bloomFilter
       !bf.value.mightContainString(msg.value)
    })
    ...

   //save rdd to kafka
}

kafka发送实用程序

import java.util.Properties
import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord,RecordMetadata}
import org.apache.spark.broadcast.broadcast
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConversions._

class KafkaSink[K,V](createProducer: () => KafkaProducer[K,V]) extends Serializable {
  lazy val producer = createProducer()

  def send(topic: String,key: K,value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K,V](topic,key,value))

  def send(topic: String,value))
}


object KafkaSink {

  def apply[K,V](config: Map[String,Object]): KafkaSink[K,V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K,V](config)
      sys.addShutdownHook {
        // Ensure that,on executor JVM shutdown,the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }

  def apply[K,V](config: Properties): KafkaSink[K,V] =
    apply(config.toMap)

  def getKafkaProducer(sparkSession: SparkSession,conf: CommonConf): broadcast[KafkaSink[String,String]] = {
    val kafkaProducerConfig = {
      val properties = new Properties()
      properties.setProperty("bootstrap.servers",conf.getorElse("kafka.bootstrap.servers",""))
      properties.setProperty("key.serializer",conf.getorElse("kafka.keySerializer","org.apache.kafka.common.serialization.StringSerializer"))
      properties.setProperty("value.serializer",conf.getorElse("kafka.valueSerializer","org.apache.kafka.common.serialization.StringSerializer"))
      properties
    }
    sparkSession.sparkContext.broadcast(KafkaSink[String,String](kafkaProducerConfig))
  }
}

错误代码

Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
metricValueProvider (org.apache.kafka.common.metrics.KafkaMetric)
metrics (org.apache.kafka.common.metrics.Metrics)
metrics (org.apache.kafka.clients.producer.internals.BufferPool)
free (org.apache.kafka.clients.producer.internals.RecordAccumulator)
accumulator (org.apache.kafka.clients.producer.KafkaProducer)
producer (com.jk.utils.MyKafkaUtils)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeClassAndobject(Kryo.java:651)
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113)
    at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
    at com.esotericsoftware.kryo.Kryo.writeClassAndobject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:241)
    at org.apache.spark.serializer.SerializationStream.writeall(Serializer.scala:140)
    at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174)
    at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1524)
    at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1519)
    at org.apache.spark.storage.diskStore.put(diskStore.scala:69)
    at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1519)
    at org.apache.spark.storage.memory.MemoryStore.org$apache$spark$storage$memory$MemoryStore$$dropBlock$1(MemoryStore.scala:473)
    at org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$1.apply$mcVI$sp(MemoryStore.scala:499)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:490)
    at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:92)
    at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:73)
    at org.apache.spark.memory.UnifiedMemoryManager.acquireStorageMemory(UnifiedMemoryManager.scala:179)
    at org.apache.spark.memory.UnifiedMemoryManager.acquireUnrollMemory(UnifiedMemoryManager.scala:186)
    at org.apache.spark.storage.memory.MemoryStore.reserveUnrollMemoryForThisTask(MemoryStore.scala:552)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:229)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1176)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1167)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1102)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1167)
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:925)
    at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1492)
    at org.apache.spark.broadcast.Torrentbroadcast$$anonfun$readbroadcastBlock$1$$anonfun$apply$2.apply(Torrentbroadcast.scala:240)
    at scala.Option.getorElse(Option.scala:121)
    at org.apache.spark.broadcast.Torrentbroadcast$$anonfun$readbroadcastBlock$1.apply(Torrentbroadcast.scala:211)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1329)
    ... 14 more
Caused by: java.lang.RuntimeException: Could not serialize lambda
    at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:69)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
    ... 62 more
Caused by: java.lang.NoSuchMethodException: org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$$Lambda$123/1592295195.writeReplace()
    at java.lang.class.getDeclaredMethod(Class.java:2130)
    at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:60)
    ... 64 more

解决方法

我认为我应该为您工作,因为您正在广播对象,而不是lambda函数。在datastax spark集群中对我有用。

val kafkaSink = sc.broadcast(KafkaSink(kafkaOptionsProducer))
DStream
  .foreachRDD((rdd: RDD[String]) => {
    rdd.foreachPartition {
      recordsOfPartition: Iterator[String] => {
        val records = recordsOfPartition.toList
        records.foreach { record: String => {
          kafkaSink.value.sendMessage("realtime-output",record)
        }
        }
      }
    }
  })
,

暂时使用df.write来避免lambda异常

rdd.toDF("value")
.write
.format("kafka")
.option("kafka.bootstrap.servers",XXX)
.option("topic",XXX)
.save()