java-无法在Spark上序列化任务

我有这样的转变:

JavaRDD<Tuple2<String, Long>> mappedRdd = myRDD.values().map(
    new Function<Pageview, Tuple2<String, Long>>() {
      @Override
      public Tuple2<String, Long> call(Pageview pageview) throws Exception {
        String key = pageview.getUrl().toString();
        Long value = getDay(pageview.getTimestamp());
        return new Tuple2<>(key, value);
      }
    });

浏览量是一种类型:Pageview.java

然后将此类注册到Spark中,如下所示:

Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);

Exception in thread “main” org.apache.spark.SparkException: Task not
serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at
org.apache.spark.rdd.RDD.map(RDD.scala:286) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89)
at
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at
org.apache.gora.tutorial.log.ExampleSpark.run(ExampleSpark.java:100)
at
org.apache.gora.tutorial.log.ExampleSpark.main(ExampleSpark.java:53)
Caused by: java.io.NotSerializableException:
org.apache.gora.tutorial.log.ExampleSpark Serialization stack:
– object not serializable (class: org.apache.gora.tutorial.log.ExampleSpark, value:
org.apache.gora.tutorial.log.ExampleSpark@1a2b4497)
– field (class: org.apache.gora.tutorial.log.ExampleSpark$1, name: this$0, type: class org.apache.gora.tutorial.log.ExampleSpark)
– object (class org.apache.gora.tutorial.log.ExampleSpark$1, org.apache.gora.tutorial.log.ExampleSpark$1@4ab2775d)
– field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
name: fun$1, type: interface
org.apache.spark.api.java.function.Function)
– object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
) at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
… 7 more

当我调试代码时,即使有一个名为KryoSerializer的类,我也会看到JavaSerializer.scala被调用.

PS 1:我不想使用Java Serializer,但在Pageview上实现Serializer并不能解决问题.

PS 2:这不会解决问题:

...
//String key = pageview.getUrl().toString();
//Long value = getDay(pageview.getTimestamp());
String key = "Dummy";
Long value = 1L;
return new Tuple2<>(key, value);
...

解决方法:

我已经用Java代码多次遇到这个问题.尽管我使用的是Java序列化,但我会将包含该代码的类设置为Serializable,或者如果您不想这样做,则将Function设为该类的静态成员.

这是解决方案的代码片段.

public class Test {
   private static Function s = new Function<Pageview, Tuple2<String, Long>>() {

     @Override
     public Tuple2<String, Long> call(Pageview pageview) throws Exception {
       String key = pageview.getUrl().toString();
       Long value = getDay(pageview.getTimestamp());
       return new Tuple2<>(key, value);
      }
  };
}

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...