陈述生存时间它如何与Apache Flink CEP模式一起使用?

问题描述


我阅读了有关状态生存时间的Apache Flink文档https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live- ttl
而且我一时也不明白。
1)
StateTTLConfig ttl = StateTtlConfig
.newBuilder(Time.minutes(60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

//And use in my Process Function 

valueStateDescriptor.enableTimetoLive(ttl);

如果我将在15:00中将ValueState放入某个元素,然后使用保存点停止我的工作,而只有在17:00,我将从上一个保存点开始我的工作。
价值状态会很清楚,对吗?
2)如果我使用Apache Flink CEP模式:

.begin("a")
.where(simpleConditionA)
.followedBy("b")
.where(simpleConditionB)
.within(Time.minutes(60));

如果我将在15:00获得A元素,然后通过保存点停止我的工作,而只有在17:00,我将从上一个保存点开始我的工作。并得到B元素,图案不匹配我对吗?
(ttl)如何与Apache Flink CEP模式一起使用?
谢谢。

我了解CEP,实际上是在利用摄取时间。我将尝试解释:我将Process Function与ValueState和TimerTime一起使用,并在onTimer方法中清除状态。我将某些元素置于状态(keyedstate),将计时器设置为1小时并执行一些逻辑。基本上将值状态+计时器用作输出限制器(1小时内有1条输出消息)。在我的公司中,我们需要停止在群集上运行作业(使用保存点),然后几个小时后,我们需要从上一个保存点重新启动作业。现在我不使用TTL,并且重新启动后,我的ValueState.value不为null。我希望在重启后不到一个小时的时间内ValueState.value不为null(如果我在停止之前进入状态),但超过一个小时的值状态始终为null。
附言:我使用RrocksDb状态后端,间隔为1s的增量检查点。效果很好。))

解决方法

如果我将在15:00放入ValueState中的某个元素,然后使用保存点停止我的工作,仅在17:00我将从上一个保存点开始我的工作。 价值状态会很清楚,对吗?

(1)此ValueState将有效地消失,但是我不确定它是否会消失。如果您的状态TTL配置中包含cleanupFullSnapshot(),则可以保证,如果您在16:00之后获取了保存点,则保存点将不包含问题状态。但是在这种情况下,似乎所有这些都不成立,因此状态在快照中。我不知道是否在快照还原期间或下一次清理期间删除了自到期以来的状态。但是,由于您指定了NeverReturnExpired,因此它不会影响结果。

它(ttl)如何与Apache Flink CEP模式一起使用?

(2)CEP不使用状态TTL。 CEP会在可能影响模式匹配的时间内保持状态,并在不再需要状态时明确清除状态。根据您对这个问题的表述方式,我认为您使用的是处理时间,而不是事件时间。在这种情况下,模式将在60分钟内不匹配。但是,如果要使用事件时间,则将使用水印确定已花费了多少时间,并且停机时间对模式匹配没有影响。

更新:

我现在看到您正在使用摄取时间,并且依靠计时器来清除状态。使用摄取时间,您可以选择使用事件时间或处理时间计时器。如果使用处理时间计时器,则在作业未运行时应触发的所有计时器将在作业重新启动后立即触发。使用事件时间计时器,只要水印达到这些计时器中的时间,它们就会触发。由于水印未保存在保存点中,因此在创建任何水印之前,必须先流并处理一些事件(并且使用定期水印,则必须经过自动水印间隔)。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...