Flink中的检查点无法与CoFlatMapFunction一起使用

问题描述

嗨,我正在尝试在我的flink模块之一中执行检查点操作,如果我注释掉CoFlatMapFunction检查点是否在取消注释仍不起作用的情况下,我正在使用CoFlatMapFunction合并到流。我在flink网站上将Checkpointing更新为documentation,其中说,对于迭代流,添加一个额外的属性以在执行此操作后强制执行checkpoint事件,但该方法也无法正常工作,请在下面找到checkpoint设置

StateBackend stateBackend = new RocksDBStateBackend(path,true);

//env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);

env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,true);

 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

 env.getCheckpointConfig().setCheckpointTimeout(120000);
 
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
 env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

 env.setStateBackend(stateBackend);

我可以看到任务状态已完成,但是自此以后我看不到日志

enter image description here

解决方法

我相信原因是FLINK-2491:只有在所有操作员/任务仍在运行时,检查点才有效。

您应该将注入集合中某些数据的数据源替换为一些其他数据源,而这些数据源不会立即过渡到已完成 ,也许是一个自定义数据源,一旦运行,该数据源便会继续存在发出的数据不足,但无所作为。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...