Flink 如何使用 CEP

问题描述

我有流媒体作业,它监听事件,使用 CEP 对它们进行操作。

流量是

stream = source
           .assignTimestampsAndWatermarks(...)
           .filter(...);

 CEP
   .pattern(stream.keysBy(e-> e.getId()),pattern)
   .process(PattenMatchProcessFunction)
   .sink(...);

键都是短暂的,进程函数不包含任何状态,也就是说可以通过设置ttl来删除状态。使用 EventTime 特性

我的问题,flink 如何处理过期的键,会对 GC 产生任何影响。 如果 flink 本身删除了密钥,那么这种情况发生的频率是多少。

面对 GC 问题,作业在部署 3 小时后卡住了。 正在做内存调优,但想消除这种情况。

解决方法

FsStateBackend 将为您的 CEP 操作员保存内存中的状态。

Flink 对 CEP 的作用是缓冲 MapState[Long,List[T]] 中的元素,该 // 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists by feeding them in the NFA // 3) advance the time to the current watermark,so that expired patterns are discarded. // 4) update the stored state for the key,by only storing the new NFA and MapState iff they have state to be used later. // 5) update the last seen watermark. 将时间戳映射到该时间到达的所有元素。一旦出现水印,Flink 将按如下方式处理缓冲的事件:

NFA.advanceTime

一旦事件处理完毕,Flink 将推进水印,这将导致状态中的旧条目过期(您可以在 {{1}} 中看到这一点)。这意味着驱逐您的元素取决于在您的流中创建和推送水印的频率。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...