基于事件时间的Flink空闲状态保留

问题描述

这可能是一个简单的问题,但我在文档中找不到明确说明的内容:使用StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)时Flink的空闲状态保留是基于事件时间计算的,还是始终基于处理时间?

我的情况与Idle State Retention Time文档中描述的情况非常相似,因此我将以它为例。我正在计算每个会话在一段时间内的点击次数

SELECT sessionId,COUNT(*) FROM clicks GROUP BY sessionId;

,并期望会话最终将变为非活动状态,这是我希望执行空闲状态保留策略的地方。与示例唯一的不同是,在作业启动时,我编写了一个自定义源函数,该函数最初读取S3中的历史事件(处理最近N天的数据),然后切换到Kafka进行新的传入事件。假设我将闲置状态保留时间设置为72小时,并处理了S3过去一个月的数据-最终我希望状态的大小能够稳定到3天左右,同时处理S3的数据时处于非活动状态会话被删除。实际上,该状态在处理上个月的数据的整个过程中一直在增长。

不幸的是,我实际使用的时间窗口要长得多(当前的空闲状态保留时间设置为20天),所以我还没有机会查看状态达到该点时是否会缩小时间。我的源函数中有可能做错了一些事情,从而阻止了空闲状态保留清除的工作,因此我们将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)