问题描述
我有一个流式 Spark 应用程序,其中正在运行的流我正在使用带有 flatMapGroupsWithState
的状态聚合删除重复的行。
但是当我在流上使用 forEachBatch
并使用我创建的相同函数来删除流上的重复项时,它将每个 Batch 视为一个独立的实体,并且仅在该单个 Micro Batch 中返回重复项。
代码:
case class User(name: String,userId: Integer)
case class StateClass(totalUsers: Int)
def removeDuplicates(inputData: Dataset[User]): Dataset[User] = {
inputData
.groupByKey(_.userId)
.flatMapGroupsWithState(OutputMode.Append,GroupStateTimeout.ProcessingTimeTimeout)(removeDuplicatesInternal)
}
def removeDuplicatesInternal(id: Integer,newData: Iterator[User],state: GroupState[StateClass]): Iterator[User] = {
if (state.hasTimedOut) {
state.remove() // Removing state since no same UserId in 4 hours
return Iterator()
}
if (newData.isEmpty)
return Iterator()
if (!state.exists) {
val firstUserData = newData.next()
val newState = StateClass(1) // Total count = 1 initially
state.update(newState)
state.setTimeoutDuration("4 hours")
Iterator(firstUserData) // Returning UserData first time
}
else {
val newState = StateClass(state.get.totalUsers + 1)
state.update(newState)
state.setTimeoutDuration("4 hours")
Iterator() // Returning empty since state already exists (Already sent this UserData before)
}
}
我使用的输入流是 userStream
。
当我直接将流传递给它时,上述函数工作正常。
val results = removeDuplicates(userStream)
但是当我做这样的事情时:
userStream
.writeStream
.foreachBatch { (batch,batchId) => writeBatch(batch) }
def writeBatch(batch: Dataset[User]): Unit = {
val distinctBatch = removeDuplicates(batch)
}
我仅在该微批次中获得不同的用户数据。但我希望它在 4 小时的超时时间内整体上是不同的。 例如:
If 1st batch has UserIds (1,3,5,1),and second batch has UserIds (2,1).
Expected BehavIoUr:
Output: 1st Batch = (1,5) and 2nd Batch = (2)
My Output: 1st Batch = (1,5) and 2nd Batch = (2,1)
我怎样才能让它在整个过程中使用相同的状态?现在,它正在对每个微批次进行不同的处理,并为每个批次创建一个单独的状态。
PS:问题不仅限于在Stream上获取重复项,我需要使用forEachBatch对Micro batches进行一些计算,并在写入之前删除重复项。
对于运行测试脚本,请参考:https://ideone.com/nZ5pq2
解决方法
行为实际上是预期的。
flatMapGroupsWithState
仅在查询流式传输时利用状态存储。 (对于批量查询,它甚至不创建状态存储,因为它不是必需的。)一旦调用 forEachBatch
,提供的 batch
参数不再跨批次连续 - 将其视为数据集来自批查询,批的意思是“一个”微批。
因此,您仍然需要将流式数据集传递给 removeDuplicate
,或者使用自己的方式在 forEachBatch
中跨批次删除重复记录。