问题描述
我有流媒体作业,它监听事件,使用 CEP 对它们进行操作。
流量是
stream = source
.assignTimestampsAndWatermarks(...)
.filter(...);
CEP
.pattern(stream.keysBy(e-> e.getId()),pattern)
.process(PattenMatchProcessFunction)
.sink(...);
键都是短暂的,进程函数不包含任何状态,也就是说可以通过设置ttl来删除状态。使用 EventTime 特性
我的问题,flink 如何处理过期的键,会对 GC 产生任何影响。 如果 flink 本身删除了密钥,那么这种情况发生的频率是多少。
面对 GC 问题,作业在部署 3 小时后卡住了。 正在做内存调优,但想消除这种情况。
解决方法
FsStateBackend
将为您的 CEP 操作员保存内存中的状态。
Flink 对 CEP 的作用是缓冲 MapState[Long,List[T]]
中的元素,该 // 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists by feeding them in the NFA
// 3) advance the time to the current watermark,so that expired patterns are discarded.
// 4) update the stored state for the key,by only storing the new NFA and MapState iff they have state to be used later.
// 5) update the last seen watermark.
将时间戳映射到该时间到达的所有元素。一旦出现水印,Flink 将按如下方式处理缓冲的事件:
NFA.advanceTime
一旦事件处理完毕,Flink 将推进水印,这将导致状态中的旧条目过期(您可以在 {{1}} 中看到这一点)。这意味着驱逐您的元素取决于在您的流中创建和推送水印的频率。