问题描述
我想在Spark文件流应用程序中实现检查点,以在任何情况下我的Spark流媒体应用程序停止/终止时处理hadoop中所有未处理的文件。我正在关注以下内容:streaming programming guide,但未找到JavaStreamingContextFactory。请帮我该怎么办。
我的代码是
public class StartAppWithCheckPoint {
public static void main(String[] args) {
try {
String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/";
String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext jssc = new JavaStreamingContext(sc,Durations.seconds(300));
JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
jssc.checkpoint(checkpointDirectory);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getorCreate(checkpointDirectory,contextFactory);
context.start();
context.awaitTermination();
context.close();
sparkSession.close();
} catch(Exception e) {
e.printstacktrace();
}
}
}
解决方法
您必须使用Checkpointing
对于检查点,请使用updateStateByKey
或reduceByKeyAndWindow
的 stateful 转换。 spark-examples中提供了很多示例,以及git-hub中的预构建spark和spark源。有关您的具体信息,请参见JavaStatefulNetworkWordCount.java;