问题描述
问。状态是否超时并同时被删除
要么
只有状态超时,ProcessingTimeout 和 EventTimeout 的状态仍然保持?
我正在对 mapGroupsWithState/flatmapGroupsWithState 进行一些实验,并且对状态超时有些困惑。
考虑到我正在维护一个带有 10 秒水印的状态并根据事件时间应用超时说:
ds.withWatermark("timestamp","10 seconds")
.groupByKey(...)
.mapGroupsWithState(
GroupStateTimeout.EventTimeTimeout)( //event timed out
...)(my_mapping_function)
在我的映射函数中说
我正在根据状态的存在执行一些操作。
我正在检查它:
//Considering it my_mapping_function for mapGroupsWithState/flatmapGroupsWithState
if(state.hasTimeout){
println("State has timedout")
state.remove()
}
else
{
val newState = state.getoption match {
case Some(s) =>
....//some operations
case _ =>
println("no state")
..return some state
state.update(newState)
//set the timeout,Does state also gets removed automatically when state has timed out?
state.setTimeoutTimestamp(state.getCurrentWatermarkMs,"10 seconds")
}
现在考虑一个水印设置为(10秒)的例子:
带有 ts 12 seconds
的传入数据 (data1)
带有 ts 20 seconds
的传入数据 (data1)
所以水印到这里将是 (20-10) = 10 秒
带有 ts 12 seconds
的传入数据 (data2)
(data2) 的状态将在 20 seconds
超时
(如 10 秒(水印时间)+ 10 秒(我们设置了额外的超时时间)
所以如果传入的数据(data1)带有 ts 20 seconds
lly,传入数据 (data1) 与 ts 30 seconds
lly,传入数据 (data1) 与 ts 40 seconds
到这里,水印现在是20秒。 (40-10)
所以 data2 的状态是超时,因为最后一个数据长达 12 秒
问。当 data2 的状态超时时,状态只是超时还是状态被删除?
因为它没有打印 println("State has timedout")
它打印了println("no state")
。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)