问题描述
我有一个将 csv 数据读入 Spark 的 Dataset
的类。如果我只是简单地读取并返回 data
,一切正常。
但是,如果我在从函数返回之前将 MapFunction
应用到 data
,我会得到
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: com.Workflow
。
我知道 Spark 的工作原理以及它需要序列化对象以进行分布式处理,但是,我没有在映射逻辑中使用对 Workflow
类的任何引用。我没有在我的映射逻辑中调用任何 Workflow
类函数。那么为什么 Spark 试图序列化 Workflow
类?任何帮助将不胜感激。
public class Workflow {
private final SparkSession spark;
public Dataset<Row> readData(){
final StructType schema = new StructType()
.add("text","string",false)
.add("category",false);
Dataset<Row> data = spark.read()
.schema(schema)
.csv(dataPath);
/*
* works fine till here if I call
* return data;
*/
Dataset<Row> cleanedData = data.map(new MapFunction<Row,Row>() {
public Row call(Row row){
/* some mapping logic */
return row;
}
},RowEncoder.apply(schema));
cleanedData.printSchema();
/* .... ERROR .... */
cleanedData.show();
return cleanedData;
}
}
解决方法
匿名内部类具有对封闭类的隐藏/隐式引用。使用 Lambda 表达式或使用 Roma Anankin 的解决方案
,您可以让 Workflow 将 Serializeble 和 SparkSession 实现为@transient