将 flatMapGroupsWithState 与 forEachBatch 一起使用

问题描述

我有一个流式 Spark 应用程序,其中正在运行的流我正在使用带有 flatMapGroupsWithState 的状态聚合删除重复的行。 但是当我在流上使用 forEachBatch 并使用我创建的相同函数删除流上的重复项时,它将每个 Batch 视为一个独立的实体,并且仅在该单个 Micro Batch 中返回重复项。

代码

case class User(name: String,userId: Integer)
case class StateClass(totalUsers: Int)

    def removeDuplicates(inputData: Dataset[User]): Dataset[User] = {
      inputData
        .groupByKey(_.userId)
        .flatMapGroupsWithState(OutputMode.Append,GroupStateTimeout.ProcessingTimeTimeout)(removeDuplicatesInternal)
    }

    def removeDuplicatesInternal(id: Integer,newData: Iterator[User],state: GroupState[StateClass]): Iterator[User] = {
      if (state.hasTimedOut) {
        state.remove() // Removing state since no same UserId in 4 hours
        return Iterator()
      }
      if (newData.isEmpty)
        return Iterator()

      if (!state.exists) {
        val firstUserData = newData.next()
        val newState = StateClass(1) // Total count = 1 initially
        state.update(newState)
        state.setTimeoutDuration("4 hours")
        Iterator(firstUserData) // Returning UserData first time
      }
      else {
        val newState = StateClass(state.get.totalUsers + 1)
        state.update(newState)
        state.setTimeoutDuration("4 hours")
        Iterator() // Returning empty since state already exists (Already sent this UserData before)
      }
    }

我使用的输入流是 userStream。 当我直接将流传递给它时,上述函数工作正常。

val results = removeDuplicates(userStream)

但是当我做这样的事情时:

userStream
.writeStream
.foreachBatch { (batch,batchId) => writeBatch(batch) } 

def writeBatch(batch: Dataset[User]): Unit = {
  val distinctBatch = removeDuplicates(batch)
}

我仅在该微批次中获得不同的用户数据。但我希望它在 4 小时的超时时间内整体上是不同的。 例如:

If 1st batch has UserIds (1,3,5,1),and second batch has UserIds (2,1). 
Expected BehavIoUr: 
  Output: 1st Batch = (1,5) and 2nd Batch = (2)
My Output: 1st Batch = (1,5) and 2nd Batch = (2,1)

我怎样才能让它在整个过程中使用相同的状态?现在,它正在对每个微批次进行不同的处理,并为每个批次创建一个单独的状态。

PS:问题不仅限于在Stream上获取重复项,我需要使用forEachBatch对Micro batches进行一些计算,并在写入之前删除重复项。

对于运行测试脚本,请参考:https://ideone.com/nZ5pq2

解决方法

行为实际上是预期的。

flatMapGroupsWithState 仅在查询流式传输时利用状态存储。 (对于批量查询,它甚至不创建状态存储,因为它不是必需的。)一旦调用 forEachBatch,提供的 batch 参数不再跨批次连续 - 将其视为数据集来自批查询,批的意思是“一个”微批。

因此,您仍然需要将流式数据集传递给 removeDuplicate,或者使用自己的方式在 forEachBatch 中跨批次删除重复记录。