Flink检查点模式ExactlyOnce无法按预期工作

问题描述

如果我理解错误,我是新手,请抱歉,我正在构建一个数据流应用程序,并且该流包含多个数据流,这些数据流检查传入的DataStream中是否存在必需的字段。我的应用程序验证传入的数据,如果数据验证成功,则应将数据附加到给定文件中(如果已存在)。我正在尝试模拟是否在一个DataStream中发生任何异常,因为我正在其中一个流中明确抛出异常,因此其他数据流不应受到影响。在下面的示例中,为简单起见,我使用Windows文本文件附加数据

注意:我的流程没有状态,因为我没有什么要存储的状态

public class ExceptionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);

       // env.setParallelism(1);

        //env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint",true));

        // to set minimum progress time to happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // checkpoints have to complete within 5000 ms,or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(5000);

        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  
        
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3,// number of restart attempts
                Time.of(10,TimeUnit.SECONDS) // delay
        ));

        DataStream<String> input1 = env.fromElements("hello");
        
        DataStream<String> input2 = env.fromElements("hello");


        DataStream<String> output1 = input.flatMap(new FlatMapFunction<String,String>() {
            @Override
            public void flatMap(String value,Collector<String> out) throws Exception {
                //out.collect(value.concat(" world"));
                throw new Exception("=====================NO VALUE TO CHECK=================");
            }
        });


        DataStream<String> output2 = input.flatMap(new FlatMapFunction<String,Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });

       output2.addSink(new SinkFunction<String>() {
           @Override
           public void invoke(String value) throws Exception {
               try {
                File myObj = new File("C://flinkOutput//filename.txt");
                if (myObj.createNewFile()) {
                    System.out.println("File created: " + myObj.getName());
                    BufferedWriter out = new BufferedWriter(
                            new FileWriter("C://flinkOutput//filename.txt",true));
                    out.write(value);
                    out.close();
                    System.out.println("Successfully wrote to the file.");
                } else {
                    System.out.println("File already exists.");
                    BufferedWriter out = new BufferedWriter(
                            new FileWriter("C://flinkOutput//filename.txt",true));
                    out.write(value);
                    out.close();
                    System.out.println("Successfully wrote to the file.");
                }
            } catch (IOException e) {
                System.out.println("An error occurred.");
                e.printstacktrace();
            }
           }
       });

        env.execute();

    }

我对此毫无疑问

  1. 当我在output1流中引发异常时,即使遇到异常并将数据写入本地文件中后,第二个流output2仍在运行,但是当我检查文件时,输出如下

     hello world
     hello world
     hello world
     hello world
    
  2. 根据我对flink文档的了解,如果我将检查点模式用作EXACTLY_ONCE,则由于该过程已经完成并将数据写入文件,因此不应将数据写入文件的时间不超过一次。但是在我的情况下却没有发生,如果做错任何事我也不会得到

请帮助我清除对检查点的疑惑,以及如何实现我在flink中阅读有关TWO_PHASE_COMMIT的EXACTLY_ONCE机制的信息,但我没有获得有关如何实现它的任何示例。

如@Mikalai Lushchytski所建议,我在下面实现了StreamingSinkFunction

具有StreamingSinkFunction

public class ExceptionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);

       // env.setParallelism(1);

        //env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint",Collector<String> out) throws Exception {
                //out.collect(value.concat(" world"));
                throw new Exception("=====================NO VALUE TO CHECK=================");
            }
        });
        
        
        DataStream<String> output2 = input.flatMap(new FlatMapFunction<String,Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });
        
        
        String outputPath = "C://flinkCheckpoint";

        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath),new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1)
                                .build())
                .build();
                
        
        output2.addSink(sink);

       
       });

        env.execute();

    }

但是当我检查Checkpoint文件夹时,我可以看到它创建了四个部分文件,如下所示

enter image description here

由于创建了多部分文件,我在做什么吗?

解决方法

为了保证端到端的一次精确记录传递(除了一次精确的状态语义),数据接收器需要参与检查点机制(以及数据源)。

如果要将数据写入文件,则可以使用StreamingFileSink,它将其输入元素发射到存储桶中的FileSystem文件中。它与检查点机制集成在一起,可提供开箱即用的语义。

如果要实现自己的接收器,则接收器功能必须实现CheckpointedFunction接口并正确实现snapshotState(FunctionSnapshotContext context)方法,该方法在请求检查点快照并刷新当前应用程序状态时调用。另外,我建议实现CheckpointListener接口,以便在分布式检查点完成后得到通知。

Flink已经提供了一个抽象TwoPhaseCommitSinkFunction,它是所有打算实现一次语义的SinkFunction的推荐基类。通过在CheckpointedFunctionCheckpointListener。例如,您可以查看FlinkKafkaProducer.java源代码。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...