如何在Flink CEP中指定应维持状态的时间

问题描述

让我解释一下我需要处理的情况。让我们假设三个设备A,B,C正在将日志发送到flink CEP进行处理。让我们假设模式为A,然后是5分钟B,然后是5分钟后是C。让我们假设B设备关闭并在50分钟后发送日志的情况。因此,在这种情况下,所有事件都将被丢弃。我只是想知道flink中是否有任何支持将状态保持到特定的定义间隔(在我的情况下,假设1天,这意味着A和C日志都将存储1天,之后日志将被保存)如果没有匹配,则将其删除)。请从CEP的角度提出可行性。

解决方法

据我所知,没有什么比untilwithin更具体了,但是这些都是用来指定t的。这取决于您的确切设置,但是如果将所有数据归为一个主题,则可能很难避免设备长时间停机。您可以尝试修改水印生成逻辑,但这通常会延迟输出。

在这种情况下,您可以考虑将ProcessFunction与自定义逻辑结合使用,该逻辑更加灵活,可以让您以更好的粒度处理状态。

编辑:

因此,基本上,您需要创建一个状态来保存部分匹配项,具体取决于它可能是ListStateValueState的情况,然后只需在其中找到您发现的部分匹配项即可。因此,如果您要A-> B-> C,那么如果您有A,则将其检查并放入状态,然后,如果收到B,则可以检查时间戳并将其附加到状态,最后如果您有C您可以发出整个比赛并清除状态。

如果您在此处设置stateTTL,这只会断言该状态在一段时间内不会被读写后会自动清除。

还请注意,如果模式不是很复杂,这是有意义的,否则会很快成为编写逻辑代码的噩梦。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...