问题描述
如果我理解错误,我是新手,请抱歉,我正在构建一个数据流应用程序,并且该流包含多个数据流,这些数据流检查传入的DataStream中是否存在必需的字段。我的应用程序验证传入的数据,如果数据验证成功,则应将数据附加到给定文件中(如果已存在)。我正在尝试模拟是否在一个DataStream中发生任何异常,因为我正在其中一个流中明确抛出异常,因此其他数据流不应受到影响。在下面的示例中,为简单起见,我使用Windows文本文件附加数据
注意:我的流程没有状态,因为我没有什么要存储的状态
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint",true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms,or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,// number of restart attempts
Time.of(10,TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String,String>() {
@Override
public void flatMap(String value,Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String,Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
output2.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
try {
File myObj = new File("C://flinkOutput//filename.txt");
if (myObj.createNewFile()) {
System.out.println("File created: " + myObj.getName());
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt",true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
} else {
System.out.println("File already exists.");
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt",true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
}
} catch (IOException e) {
System.out.println("An error occurred.");
e.printstacktrace();
}
}
});
env.execute();
}
我对此毫无疑问
-
当我在output1流中引发异常时,即使遇到异常并将数据写入本地文件中后,第二个流output2仍在运行,但是当我检查文件时,输出如下
hello world hello world hello world hello world
-
根据我对flink文档的了解,如果我将检查点模式用作EXACTLY_ONCE,则由于该过程已经完成并将数据写入文件,因此不应将数据写入文件的时间不超过一次。但是在我的情况下却没有发生,如果做错任何事我也不会得到
请帮助我清除对检查点的疑惑,以及如何实现我在flink中阅读有关TWO_PHASE_COMMIT的EXACTLY_ONCE机制的信息,但我没有获得有关如何实现它的任何示例。
如@Mikalai Lushchytski所建议,我在下面实现了StreamingSinkFunction
具有StreamingSinkFunction
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint",Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String,Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
String outputPath = "C://flinkCheckpoint";
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath),new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1)
.build())
.build();
output2.addSink(sink);
});
env.execute();
}
但是当我检查Checkpoint文件夹时,我可以看到它创建了四个部分文件,如下所示
由于创建了多部分文件,我在做什么吗?
解决方法
为了保证端到端的一次精确记录传递(除了一次精确的状态语义),数据接收器需要参与检查点机制(以及数据源)。
如果要将数据写入文件,则可以使用StreamingFileSink,它将其输入元素发射到存储桶中的FileSystem
文件中。它与检查点机制集成在一起,可提供开箱即用的语义。
如果要实现自己的接收器,则接收器功能必须实现CheckpointedFunction
接口并正确实现snapshotState(FunctionSnapshotContext context)
方法,该方法在请求检查点快照并刷新当前应用程序状态时调用。另外,我建议实现CheckpointListener
接口,以便在分布式检查点完成后得到通知。
Flink已经提供了一个抽象TwoPhaseCommitSinkFunction
,它是所有打算实现一次语义的SinkFunction
的推荐基类。通过在CheckpointedFunction
和
CheckpointListener
。例如,您可以查看FlinkKafkaProducer.java源代码。