问题描述
我正在尝试使用 mapGroupsWithState 方法对传入的数据流进行有状态的结构化流传输。但是我面临的问题是,我为 groupByKey 选择的密钥会使我的状态变得太大而又太快。显而易见的解决方法是更改密钥,但是我希望在 update 方法中应用的业务逻辑要求密钥与我现在拥有的密钥完全相同,或者如果可能,请访问 GroupState (所有密钥)。
例如,我有来自不同组织的数据流,通常一个组织包含userId,personId等。请参见下面的代码:
val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
.groupByKey(key => key.orgId)
.mapGroupsWithState(noTimeout)(updateUserStatistic)
val df = statisticStream.toDF()
val query = df
.writeStream
.outputMode(Update())
.option("checkpointLocation",s"$checkpointLocation/$name")
.foreach(new UserCountWriter(spark.sparkContext.getConf))
.outputMode(Update())
.queryName(name)
.trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))
案例类:
case class User(
orgId: Long,profileId: Long,userId: Long)
case class UserStatistic(
orgId: Long,kNown: Long,ukNown: Long,userSeq: Seq[User])
更新方法:
def updateUserStatistic(
orgId: Long,newEvents: Iterator[User],oldState: GroupState[UserStatistic]): UserStatistic = {
var state: UserStatistic = if (oldState.exists) oldState.get else UserStatistic(orgId,0L,Seq.empty)
for (event <- newEvents) {
//business logic like checking if userId in this organization is of certain type and then accordingly update the kNown or unkNown attribute for that particular user.
oldState.update(state)
state
}
当我必须在Driver-Executor模型上执行此问题时,问题会变得更加严重,因为我希望每个组织中都有1到1千万用户,这可能意味着一个执行者上有许多州(如果我理解不正确,请纠正我)
可能的解决方案失败:
- 按键作为用户ID分组-因为那样我就无法获得给定orgId的所有userId,因为这些GroupStates放在了聚合键,值对中,这里是UserId。因此,对于每个新的UserId,即使它属于同一组织,也会创建一个新状态。
任何帮助或建议都将受到赞赏。
解决方法
您的状态不断增加,因为在当前实现中,不会从GroupState中删除任何键/状态对。
要完全缓解您所面临的问题(无限增长状态),mapGroupsWithState
方法允许您使用超时。您可以选择两种超时类型:
- 使用
GroupStateTimeout.ProcessingTimeTimeout
和GroupState.setTimeoutDuration()
的处理时间超时,或者 - 使用
GroupStateTimeout.EventTimeTimeout
和GroupState.setTimeoutTimestamp()
的事件时间超时。
请注意,两者之间的区别是基于持续时间的超时和更灵活的基于基于时间的超时。
在特征GroupState
的ScalaDocs中,您将找到一个很好的模板,了解如何在映射函数中使用超时:
def mappingFunction(key: String,value: Iterator[Int],state: GroupState[Int]): String = {
if (state.hasTimedOut) { // If called when timing out,remove the state
state.remove()
} else if (state.exists) { // If state exists,use it for processing
val existingState = state.get // Get the existing state
val shouldRemove = ... // Decide whether to remove the state
if (shouldRemove) {
state.remove() // Remove the state
} else {
val newState = ...
state.update(newState) // Set the new state
state.setTimeoutDuration("1 hour") // Set the timeout
}
} else {
val initialState = ...
state.update(initialState) // Set the initial state
state.setTimeoutDuration("1 hour") // Set the timeout
}
...
// return something
}