重新启动任务管理器后,Apache Flink丢失记录

问题描述

我正在将Flink集群与一个作业管理器容器和一个Kubernetes群集中的两个任务管理器容器一起使用。当我将流作业提交给作业管理器时,它将运行该作业,并且我将输出接收到接收器中。我还启用了检查点以从故障中恢复。现在,当我有意删除一个任务管理器窗格以验证flink中的节点故障处理时,我看到未收到那些假定到达接收器的记录。 当kubernetes自动重启Pod时,它将继续处理记录,但不会从检查点恢复。我正在使用以下命令提交作业

flink run -Dparallelism=2 -m localhost:<port> -c <flink job> -p=2 <flink job>.jar

我在工作环境中有以下事情:

 env.enableCheckpointing(10000)
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
   
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new FsStateBackend(Paths.get(<checkpoint path>).toUri,false))

任务管理器窗格重新启动时,我有以下日志。

2020-10-01 10:01:30,096 INFO  org.apache.flink.runtime.blob.BlobClient                     [] - Downloading 2966c462794bf94523e9a53c1d9a2f13/p-421bdbdb924a09ddc017b854d52d9a9457109d43-7307729d1f0408de20cd29e352a2a655 from flinksessioncluster-sample-jobmanager/172.20.225.40:6124

但是在检查点目录2966c462794bf94523e9a53c1d9a2f13中,我只有以下各项。

chk-299  shared  taskowned

我在目录2966c462794bf94523e9a53c1d9a2f13中没有目录p-421bdbdb924a09ddc017b854d52d9a9457109d43-7307729d1f0408de20cd29e352a2a655

根据文档,任务应自动从检查点位置恢复。

请让我知道这可能是问题所在。

更新

进行了实际测试-

以“ t”秒间隔连续将记录插入flink作业中。当任务管理器处理记录时,我杀死了一个任务管理器窗格。此时,我停止了将记录插入到flink作业中。在作业的输入端,我向其中插入了1000条记录。当任务管理器再次出现时,我在接收器中有700条记录。

现在我开始一次插入一条记录,发现接收器中的记录突然增加到940,然后开始增加1,即在任务管理器崩溃后插入的记录开始接收。但是我丢失了任务管理器崩溃前插入的最初1000条记录中的60条记录

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...