我有这样的转变:
JavaRDD<Tuple2<String, Long>> mappedRdd = myRDD.values().map(
new Function<Pageview, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> call(Pageview pageview) throws Exception {
String key = pageview.getUrl().toString();
Long value = getDay(pageview.getTimestamp());
return new Tuple2<>(key, value);
}
});
浏览量是一种类型:Pageview.java
然后将此类注册到Spark中,如下所示:
Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);
Exception in thread “main” org.apache.spark.SparkException: Task not
serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at
org.apache.spark.rdd.RDD.map(RDD.scala:286) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89)
at
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at
org.apache.gora.tutorial.log.ExampleSpark.run(ExampleSpark.java:100)
at
org.apache.gora.tutorial.log.ExampleSpark.main(ExampleSpark.java:53)
Caused by: java.io.NotSerializableException:
org.apache.gora.tutorial.log.ExampleSpark Serialization stack:
– object not serializable (class: org.apache.gora.tutorial.log.ExampleSpark, value:
org.apache.gora.tutorial.log.ExampleSpark@1a2b4497)
– field (class: org.apache.gora.tutorial.log.ExampleSpark$1, name: this$0, type: class org.apache.gora.tutorial.log.ExampleSpark)
– object (class org.apache.gora.tutorial.log.ExampleSpark$1, org.apache.gora.tutorial.log.ExampleSpark$1@4ab2775d)
– field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
name: fun$1, type: interface
org.apache.spark.api.java.function.Function)
– object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
) at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
… 7 more
当我调试代码时,即使有一个名为KryoSerializer的类,我也会看到JavaSerializer.scala被调用.
PS 1:我不想使用Java Serializer,但在Pageview上实现Serializer并不能解决问题.
PS 2:这不会解决问题:
...
//String key = pageview.getUrl().toString();
//Long value = getDay(pageview.getTimestamp());
String key = "Dummy";
Long value = 1L;
return new Tuple2<>(key, value);
...
解决方法:
我已经用Java代码多次遇到这个问题.尽管我使用的是Java序列化,但我会将包含该代码的类设置为Serializable,或者如果您不想这样做,则将Function设为该类的静态成员.
public class Test {
private static Function s = new Function<Pageview, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> call(Pageview pageview) throws Exception {
String key = pageview.getUrl().toString();
Long value = getDay(pageview.getTimestamp());
return new Tuple2<>(key, value);
}
};
}