Java Spark 数据集 MapFunction - 任务在没有任何类引用的情况下不可序列化

问题描述

我有一个将 csv 数据读入 Spark 的 Dataset 的类。如果我只是简单地读取并返回 data,一切正常。

但是,如果我在从函数返回之前将 MapFunction 应用到 data,我会得到

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

Caused by: java.io.NotSerializableException: com.Workflow

我知道 Spark 的工作原理以及它需要序列化对象以进行分布式处理,但是,我没有在映射逻辑中使用对 Workflow 类的任何引用。我没有在我的映射逻辑中调用任何 Workflow函数。那么为什么 Spark 试图序列化 Workflow 类?任何帮助将不胜感激。

public class Workflow {

    private final SparkSession spark;   

    public Dataset<Row> readData(){
        final StructType schema = new StructType()
            .add("text","string",false)
            .add("category",false);

        Dataset<Row> data = spark.read()
            .schema(schema)
            .csv(dataPath);

        /* 
         * works fine till here if I call
         * return data;
         */

        Dataset<Row> cleanedData = data.map(new MapFunction<Row,Row>() {
            public Row call(Row row){
                /* some mapping logic */
                return row;
            }
        },RowEncoder.apply(schema));

        cleanedData.printSchema();
        /* .... ERROR .... */
        cleanedData.show();

        return cleanedData;
    }
}

解决方法

匿名内部类具有对封闭类的隐藏/隐式引用。使用 Lambda 表达式或使用 Roma Anankin 的解决方案

,

您可以让 Workflow 将 Serializeble 和 SparkSession 实现为@transient