问题描述
- 我的程序员起初运行正常。
- 当我添加spark BloomFilter时,我的程序员在执行几次批处理后失败,并
Could not serialize lambda
失败。 - 我尝试注册Kryo但也失败了,我该怎么办,谢谢
conf.registerKryoClasses(Array(
Class.forName("org.apache.spark.util.sketch.BloomFilterImpl"),Class.forName("org.apache.spark.util.sketch.BloomFilter"),Class.forName("org.apache.spark.util.sketch.BitArray"),Class.forName("com.jk.utils.KafkaSink"),Class.forName("com.jk.utils.KafkaSink$$anonfun$1"),Class.forName("org.apache.kafka.clients.producer.KafkaProducer"),Class.forName("org.apache.kafka.clients.producer.internals.RecordAccumulator"),Class.forName("org.apache.kafka.clients.ApiVersions"),Class.forName("org.json4s.JsonAST$JObject"),Class.forName("org.json4s.JsonAST$JArray"),Class.forName("org.json4s.JsonAST$JString"),Class.forName("org.json4s.JsonAST$JInt"),Class.forName("org.json4s.JsonAST$JBool"),Class.forName("org.json4s.JsonAST$JLong"),Class.forName("scala.math.BigInt"),Class.forName("java.math.BigInteger"),Class.forName("org.apache.kafka.common.metrics.KafkaMetric"),Class.forName("org.apache.kafka.common.metrics.Metrics"),Class.forName("org.apache.kafka.clients.producer.internals.BufferPool")
))
主要代码
import org.apache.spark.util.sketch.BloomFilter
var bf: broadcast[BloomFilter] = sc.broadcast(BloomFilter.create(10000000L))
kafkaDstream.foreachRDD(record => {
val rdd = record.filter(msg=>{
// filter with bloomFilter
!bf.value.mightContainString(msg.value)
})
...
//save rdd to kafka
}
kafka发送实用程序
import java.util.Properties
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord,RecordMetadata}
import org.apache.spark.broadcast.broadcast
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConversions._
class KafkaSink[K,V](createProducer: () => KafkaProducer[K,V]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String,key: K,value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K,V](topic,key,value))
def send(topic: String,value))
}
object KafkaSink {
def apply[K,V](config: Map[String,Object]): KafkaSink[K,V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K,V](config)
sys.addShutdownHook {
// Ensure that,on executor JVM shutdown,the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K,V](config: Properties): KafkaSink[K,V] =
apply(config.toMap)
def getKafkaProducer(sparkSession: SparkSession,conf: CommonConf): broadcast[KafkaSink[String,String]] = {
val kafkaProducerConfig = {
val properties = new Properties()
properties.setProperty("bootstrap.servers",conf.getorElse("kafka.bootstrap.servers",""))
properties.setProperty("key.serializer",conf.getorElse("kafka.keySerializer","org.apache.kafka.common.serialization.StringSerializer"))
properties.setProperty("value.serializer",conf.getorElse("kafka.valueSerializer","org.apache.kafka.common.serialization.StringSerializer"))
properties
}
sparkSession.sparkContext.broadcast(KafkaSink[String,String](kafkaProducerConfig))
}
}
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: Could not serialize lambda
Serialization trace:
metricValueProvider (org.apache.kafka.common.metrics.KafkaMetric)
metrics (org.apache.kafka.common.metrics.Metrics)
metrics (org.apache.kafka.clients.producer.internals.BufferPool)
free (org.apache.kafka.clients.producer.internals.RecordAccumulator)
accumulator (org.apache.kafka.clients.producer.KafkaProducer)
producer (com.jk.utils.MyKafkaUtils)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndobject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113)
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndobject(Kryo.java:651)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:241)
at org.apache.spark.serializer.SerializationStream.writeall(Serializer.scala:140)
at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174)
at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1524)
at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1519)
at org.apache.spark.storage.diskStore.put(diskStore.scala:69)
at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1519)
at org.apache.spark.storage.memory.MemoryStore.org$apache$spark$storage$memory$MemoryStore$$dropBlock$1(MemoryStore.scala:473)
at org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$1.apply$mcVI$sp(MemoryStore.scala:499)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:490)
at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:92)
at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:73)
at org.apache.spark.memory.UnifiedMemoryManager.acquireStorageMemory(UnifiedMemoryManager.scala:179)
at org.apache.spark.memory.UnifiedMemoryManager.acquireUnrollMemory(UnifiedMemoryManager.scala:186)
at org.apache.spark.storage.memory.MemoryStore.reserveUnrollMemoryForThisTask(MemoryStore.scala:552)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:229)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1176)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1167)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1102)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1167)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:925)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1492)
at org.apache.spark.broadcast.Torrentbroadcast$$anonfun$readbroadcastBlock$1$$anonfun$apply$2.apply(Torrentbroadcast.scala:240)
at scala.Option.getorElse(Option.scala:121)
at org.apache.spark.broadcast.Torrentbroadcast$$anonfun$readbroadcastBlock$1.apply(Torrentbroadcast.scala:211)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1329)
... 14 more
Caused by: java.lang.RuntimeException: Could not serialize lambda
at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:69)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
... 62 more
Caused by: java.lang.NoSuchMethodException: org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$$Lambda$123/1592295195.writeReplace()
at java.lang.class.getDeclaredMethod(Class.java:2130)
at com.esotericsoftware.kryo.serializers.ClosureSerializer.write(ClosureSerializer.java:60)
... 64 more
解决方法
我认为我应该为您工作,因为您正在广播对象,而不是lambda函数。在datastax spark集群中对我有用。
val kafkaSink = sc.broadcast(KafkaSink(kafkaOptionsProducer))
DStream
.foreachRDD((rdd: RDD[String]) => {
rdd.foreachPartition {
recordsOfPartition: Iterator[String] => {
val records = recordsOfPartition.toList
records.foreach { record: String => {
kafkaSink.value.sendMessage("realtime-output",record)
}
}
}
}
})
,
暂时使用df.write
来避免lambda
异常
rdd.toDF("value")
.write
.format("kafka")
.option("kafka.bootstrap.servers",XXX)
.option("topic",XXX)
.save()