问题描述
使用mapGroupsWithState
进行Spark结构化流式传输时,我非常努力地理解超时设置。
下面的链接有非常详细的规范,但是我不确定我是否正确理解它,尤其是GroupState.setTimeoutTimeStamp()
选项。设置状态有效期的含义与事件时间有关。
https://spark.apache.org/docs/3.0.0-preview/api/scala/org/apache/spark/sql/streaming/GroupState.html
我将它们复制到这里:
With EventTimeTimeout,the user also has to specify the the the event time watermark in the query using Dataset.withWatermark().
With this setting,data that is older than the watermark are filtered out.
The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(),and the timeout would occur when the watermark advances beyond the set timestamp.
You can control the timeout delay by two parameters - watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering).
Guarantees provided by this timeout are as follows:
Timeout will never be occur before watermark has exceeded the set timeout.
Similar to processing time timeouts,there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream,and the event time of the data has actually advanced.
问题1 :
这句话timestamp
中的and the timeout would occur when the watermark advances beyond the set timestamp
是什么?是绝对时间还是相对于该状态下当前事件时间的相对持续时间?我知道我可以通过用```
例如说我有类似以下的数据状态,when
可以通过在what value
中设置what settings
来显示吗?
+-------+-----------+-------------------+
|expired|something | timestamp|
+-------+-----------+-------------------+
| false| someKey |2020-08-02 22:02:00|
+-------+-----------+-------------------+
问题2 :
读完句子Data that is older than the watermark are filtered out
,我知道从kafka读取后到达数据会被忽略,这是正确的吗?
问题原因
在不理解这些的情况下,我无法真正将它们应用于用例。含义何时使用GroupState.setTimeoutDuration()
,何时使用GroupState.setTimeoutTimestamp()
非常感谢。
ps。我也尝试在下面阅读
- https://www.waitingforcode.com/apache-spark-structured-streaming/stateful-transformations-mapgroupswithstate/read
(confused me,did not understand)
- https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
(did not say a lot of it for my interest)
解决方法
句子
timestamp
中的and the timeout would occur when the watermark advances beyond the set timestamp
是什么?
这是您GroupState.setTimeoutTimestamp()
设置的时间戳。
是绝对时间还是相对于该状态下当前事件时间的相对持续时间?
这是基于当前批处理窗口的相对时间(不是持续时间)。
说我有一些数据状态(列
timestamp=2020-08-02 22:02:00
),何时通过在什么设置中设置什么值来使其失效?
让我们假设您的接收器查询具有5分钟的已定义处理触发器(由trigger()
设置)。另外,我们假设您在应用groupByKey
和mapGroupsWithState
之前已经使用了水印。我了解您想使用基于事件时间(而不是处理时间的超时),因此您的查询将类似于:
ds.withWatermark("timestamp","10 minutes")
.groupByKey(...) // declare your key
.mapGroupsWithState(
GroupStateTimeout.EventTimeTimeout)(
...) // your custom update logic
现在,这取决于如何通过“自定义更新逻辑”设置TimeoutTimestamp。在您的自定义更新逻辑中的某处,您将需要致电
state.setTimeoutTimestamp()
此方法有四个不同的签名,值得仔细阅读其文档。在(withWatermark
)中设置水印后,我们实际上可以利用该时间了。通常的规则是:重要的是将超时时间戳记(由state.setTimeoutTimestamp()
设置)设置为大于当前水印的值。为了继续我们的示例,我们添加一个小时,如下所示:
state.setTimeoutTimestamp(state.getCurrentWatermarkMs,"1 hour")
最后,您的消息可以进入22:00:00
和22:15:00
之间的流中,并且如果该消息是密钥的最后一条消息,它将在您的GroupState中超时23:15:00
。
问题2:阅读句子
Data that is older than the watermark are filtered out
,我知道从kafka读取后到达数据会被忽略,这是正确的吗?
是的,这是正确的。对于批处理时间间隔22:00:00-22:05:00,所有具有事件时间(由列timestamp
定义)的消息将在稍后声明的10分钟水印到达(然后在22:15:00之后)到达)将在您的查询中被忽略,并且不会在您的“自定义更新逻辑”中进行处理。