问题描述
我已将一个广播数组列表定义为公共静态,并且此数组列表(数组列表的名称:“qList”)在作业处理程序方法中开始新作业时填充了新值,然后在 DStream lambda 闭包中使用此数组列表,但是当在火花集群上运行,作业失败并显示消息“空指针异常”:
Caused by: org.apache.spark.SparkException: Job aborted due to stage 失败:阶段 17.0 中的任务 1 失败 4 次,最近失败: 阶段 17.0 中丢失的任务 1.3(TID 40、192.168.1.97、执行者 0): java.lang.NullPointerException 在 QProcessing.lambda$3(QProcessing.java:345) ...
我的代码:
@Override
public void onBatchSubmitted(StreamingListenerBatchSubmitted arg0) {
// Todo Auto-generated method stub
QProcessing.qList.value().clear();
for(int i = 0; i < 2; i++)
try {
QProcessing.qList.value().add(i,QProcessing.bufferedReader.readLine());
} catch (IOException e) {
// Todo Auto-generated catch block
e.printstacktrace();
}
}
...
private static JavaPairDStream<Long,List<String>> distributeSerach(
JavaPairDStream<Long,BPlusTree<Integer,String>> inputRDD,int role,int accessControlType,boolean topkAttach,int i) {
return inputRDD.mapToPair(index -> {
List<String> searchResult = null;
Instant startdistributedBPTSearch = Instant.Now();
searchResult = index._2.searchRange(Integer.parseInt(QProcessing.qList.value()[i].split(",")[0]),BPlusTree.RangePolicy.INCLUSIVE,Integer.parseInt(QProcessing.qList.value()[i].split(",")[1]),role,accessControlType,topkAttach);
Instant enddistributedBPTSearch = Instant.Now();
Duration timeElapseddistributedBPTSearch = Duration.between(startdistributedBPTSearch,enddistributedBPTSearch);
Tuple2<Long,List<String>> tuple = new Tuple2<Long,List<String>>(
timeElapseddistributedBPTSearch.toMillis(),searchResult);
return tuple;
});
}
解决方法
使用spark执行指令的地方存在差异。 RDD 的定义(仅它们的实例化,而不是它们的使用)是在驱动程序中进行的,而对 RDD 的修改和操作是在执行程序(您的 lambda)中进行的。
这些部分中的每一个都运行在具有不同 JVM 的不同机器上。如果修改一个静态属性,它只会改变本地JVM,因此在驱动程序属性上添加元素是不够的。
我认为最好的解决方案不是使用 lambda,而是拥有一个对象并添加广播变量。类似的东西。
// You must define what are your A,B and C types
public class MapToPairFunction extends PairFunction<A,B,C> {
private Broadcast<List<String>> broadcast;
public void setBroadcast(Broadcast<List<String>> broadcast) {
this.broadcast = broadcast;
}
@Override
public Tuple2<B,C> call(final A parameter) {
// Here the code in the lambda
}
}
private static JavaPairDStream<Long,List<String>> DistributeSerach(
JavaPairDStream<Long,BPlusTree<Integer,String>> inputRDD,int role,int accessControlType,boolean topkAttach,int i) {
PairFunction<A,C> pairFunction = new MapToPairFunction();
pairFunction.setBroadcast(QProcessing.qList);
return inputRDD.mapToPair(pairFunction);
}
很久没接触过spark了,希望对你有帮助