状态是否也会在事件超时时通过 Spark 结构化流删除?

问题描述


问。状态是否超时并同时被删除
要么
只有状态超时,ProcessingTimeout 和 EventTimeout 的状态仍然保持?

我正在对 mapGroupsWithState/flatmapGroupsWithState 进行一些实验,并且对状态超时有些困惑。

考虑到我正在维护一个带有 10 秒水印的状态并根据事件时间应用超时说:

ds.withWatermark("timestamp","10 seconds")
  .groupByKey(...)
  .mapGroupsWithState(
    GroupStateTimeout.EventTimeTimeout)( //event timed out
    ...)(my_mapping_function)

在我的映射函数中说

我正在根据状态的存在执行一些操作。
我正在检查它:

//Considering it my_mapping_function for mapGroupsWithState/flatmapGroupsWithState

if(state.hasTimeout){
  println("State has timedout")
  state.remove()
}
else 
{
   val newState = state.getoption match {
                  case Some(s) => 
                               ....//some operations
                  case _ =>
                            println("no state")
                            ..return some state
    
    state.update(newState)

    //set the timeout,Does state also gets removed automatically when state has timed out?
    state.setTimeoutTimestamp(state.getCurrentWatermarkMs,"10 seconds")

}

现在考虑一个水印设置为(10秒)的例子:
带有 ts 12 seconds
的传入数据 (data1) 带有 ts 20 seconds
的传入数据 (data1) 所以水印到这里将是 (20-10) = 10 秒

带有 ts 12 seconds 的传入数据 (data2)
(data2) 的状态将在 20 seconds 超时
(如 10 秒(水印时间)+ 10 秒(我们设置了额外的超时时间)

所以如果传入的数据(data1)带有 ts 20 seconds
lly,传入数据 (data1) 与 ts 30 seconds
lly,传入数据 (data1) 与 ts 40 seconds
到这里,水印现在是20秒。 (40-10)

所以 data2 的状态是超时,因为最后一个数据长达 12 秒

问。当 data2 的状态超时时,状态只是超时还是状态被删除


因为它没有打印 println("State has timedout")
它打印了println("no state")

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)