问题描述
我自己想办法了!
您只需要在通过闭包之前对对象进行序列化,然后再进行反序列化即可。即使您的课程不是可序列化的,这种方法也行得通,因为它在后台使用了Kryo。您只需要一些咖喱。;)
这是我如何做的一个例子:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
随意使Blah变得很复杂,包括类,伴随对象,嵌套类,对多个3rd party库的引用。
KryoSerializationWrapper指的是:https ://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
解决方法
问题:
假设我的映射器可以是内部调用其他类并创建对象并在内部执行其他操作的函数(def)。(或者它们甚至可以是扩展(Foo)=>
Bar并在其apply方法中进行处理的类-但现在让我们忽略这种情况)
Spark仅支持Java序列化闭包。有什么办法吗?我们可以用某些东西代替闭包来做我想做的事吗?我们可以使用Hadoop轻松完成此类工作。这件事使Spark对我几乎无法使用。不能期望所有第三方库都具有所有扩展可序列化的类!
可能的解决方案:
像这样的事情似乎有用吗:https
:
//github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
看起来包装似乎是答案,但我不知道具体如何。