问题描述
嗨,我正在尝试在我的flink模块之一中执行检查点操作,如果我注释掉CoFlatMapFunction检查点是否在取消注释仍不起作用的情况下,我正在使用CoFlatMapFunction合并到流。我在flink网站上将Checkpointing更新为documentation,其中说,对于迭代流,添加了一个额外的属性以在执行此操作后强制执行checkpoint事件,但该方法也无法正常工作,请在下面找到checkpoint设置
StateBackend stateBackend = new RocksDBStateBackend(path,true);
//env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,true);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
env.setStateBackend(stateBackend);
我可以看到任务状态已完成,但是自此以后我看不到日志
解决方法
我相信原因是FLINK-2491:只有在所有操作员/任务仍在运行时,检查点才有效。
您应该将注入集合中某些数据的数据源替换为一些其他数据源,而这些数据源不会立即过渡到已完成 ,也许是一个自定义数据源,一旦运行,该数据源便会继续存在发出的数据不足,但无所作为。