在Spark Streaming中使用MapGroupWithState并提供java.lang.NullPointerException

问题描述

让我们假设我有一个事件流,它是转换为案例类的以下JSON

 e.g.
{"id" : "IdOfUser","title": "Title1"}
 to 
case class UserEvent(id: Int,title: String)

我在MongoDB中还有一个表,其中包含有关每个标题的元数据。用作查找表,例如

  val lookup_df =  sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.TitleMetaData")))

我还使用MapGroupWithState跟踪状态(https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html

我的有状态会话如下所示。

case class UserSession(var visited: collection.Map[String,Boolean],var size:Int
                       )

我的主要问题似乎是,每当我尝试在映射函数中使用动作时,查找表就会消失。当我将数据帧作为对象中的全局变量读取时,将发生以下代码。这段代码给了我一个空异常。

object StreamState{ 
      val sparkSession = SparkSession.builder()
      .master("local")
      .appName("StreamState")
      .getOrCreate()

      val new_data = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.TitleMetaData")))
    }

此StreamState对象具有以下功能。 (运行收集时,updateUserStateWithEvent引发错误)

此处发生错误。 def updateUserStateWithEvent(statez:UserSession,event:UserEvent):UserSession = { 导入sparkSession.implicits._ val current_event = lookup_df.filter($“ title” === event.url) //此收集行给我一个错误 val size = current_event.select(“ size”)。as [Int] .collect()(0) val empty_map = MapString,布尔 state.visited = empty_map state.size =大小 州 }

我已经使该辅助函数基本,因此我们不会为逻辑所困扰。状态的大小对象只是通过从开头(从MongoDB)读取的表进行更改。这是我的映射功能。

def updateAcrossEvents(user:Int,events: Iterator[UserEvent],oldState: GroupState[UserSession]):UserSession = {


      var state:UserSession = if (oldState.exists) {
        
        println(oldState.get.visited)
        oldState.get
      }
      else {
        val empty_map = Map[String,Boolean]()


        val empty_session = UserSession(empty_map,0)
        empty_session
      }
      
      import sparkSession.implicits._
      for (event <- events) {
        state = updateUserStateWithEvent(state,event)
        oldState.update(state)
      }
      state
    }

最后,这是我在流状态对象上的主要功能。 (摘自https://blog.yuvalitzchakov.com/exploring-stateful-streaming-with-spark-structured-streaming/) 您可以在该博客中找到deserializeUserEvent。

def main(args: Array[String]): Unit = {

      sparkSession.sparkContext.setLogLevel("WARN")
      import sparkSession.implicits._

      val userEventsStream = sparkSession.readStream
        .format("socket")
        .option("host","localhost")
        .option("port",12345)
        .load()
        .as[String]


      val finishedUserSessionsStream: Dataset[UserSession] =
        userEventsStream
          .map(deserializeUserEvent)
          .groupByKey(_.id)
          .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
            updateAcrossEvents)

      finishedUserSessionsStream.writeStream
        .outputMode(OutputMode.Update())
        .format("console")
        .option("checkpointLocation","checkpoint")
        .option("truncate",false)
        .start()
        .awaitTermination()


}

这给我中止任务 java.lang.NullPointerException

我想知道为什么会发生此错误以及如何解决。它与映射函数接受事件的迭代器是否有关系,还是全局变量在流传输的上下文中不起作用?

这是您可以获取的完整代码

import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.sql._


import org.apache.spark.sql.{DataFrame,Dataset,Encoder,Encoders,SparkSession}

import user.{UserEvent,UserSession,USession}

import scala.collection.{mutable,_}
import argonaut.Argonaut._
import org.apache.spark.sql.streaming.{GroupState,GroupStateTimeout,OutputMode}

import scala.collection.Map

object StreamStateThree {


  val sparkSession = SparkSession.builder()
    .master("local")
    .appName("StreamStateThree")
    //.config("spark.mongodb.input.uri","mongodb://127.0.0.1/blog.articles")
    //.config("spark.mongodb.output.uri","mongodb://127.0.0.1/blog.vectors")
    .getOrCreate()

  sparkSession.sparkContext.setLogLevel("WARN")
  import sparkSession.implicits._
  val sc = sparkSession.sparkContext

  val new_data = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.cleanVectors")))
  sparkSession.sparkContext.broadcast(new_data)

  def updateUserStateWithEvent(statez:USession,event:UserEvent):USession = {
    println("Updating")

    val empty_map = Map[String,Boolean]()
    val empty_rec: Array[String] = Array("")
    val current_event = new_data.filter($"title" === event.url)
    val size:Int = current_event.select("size").as[Int].collect()(0)
    //.limit(1)
    val empty_session = USession(empty_map,-7)
    empty_session
  }


  def updateAcrossEvents(user:Int,oldState: GroupState[USession]):USession = {
    var state:USession = if (oldState.exists) {
      println("State exists with the following visited")
      oldState.get
    }
    else {
      println("State does not exist")


      val empty_map = Map[String,Boolean]()
      val empty_session = USession(empty_map,-7)
      empty_session
    }
    // we simply specify an old date that we can compare against and
    // immediately update based on the values in our data

    for (event <- events) {
      state = updateUserStateWithEvent(state,event)
      oldState.update(state)
    }
    state
  }

  def deserializeUserEvent(json: String): UserEvent = {
    json.decodeEither[UserEvent] match {
      case Right(userEvent) => userEvent
      case Left(error) =>
        println(s"Failed to parse user event: $error")
        UserEvent.empty
    }
  }


def main(args: Array[String]): Unit = {
    //new_data2.show(20,false)
  val userEventsStream = sparkSession.readStream
    .format("socket")
    .option("host","localhost")
    .option("port",12346)
    .load()
    .as[String]



  val finishedUserSessionsStream =
    userEventsStream
      .map(deserializeUserEvent)
      .groupByKey(_.id)
      .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents)


  finishedUserSessionsStream.writeStream
    .outputMode(OutputMode.Update())
    .format("console")
    .option("checkpointLocation","checkpoint")
    .option("truncate",false)
    .start()
    .awaitTermination()
}

}

以下是USession的案例类

package user
import org.apache.spark.sql.Dataset

import scala.collection.Map
case class USession(var visited: collection.Map[String,var size : Int)

事件

package user

import argonaut.Argonaut._
import argonaut.CodecJson


case class UserEvent(id: Int,url: String)
object UserEvent {
  implicit def codec: CodecJson[UserEvent] =
    casecodec2(UserEvent.apply,UserEvent.unapply)("id","url")

  lazy val empty = UserEvent(-1,"")
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...