问题描述
让我们假设我有一个事件流,它是转换为案例类的以下JSON
e.g.
{"id" : "IdOfUser","title": "Title1"}
to
case class UserEvent(id: Int,title: String)
。
我在MongoDB中还有一个表,其中包含有关每个标题的元数据。用作查找表,例如
val lookup_df = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.TitleMetaData")))
我还使用MapGroupWithState跟踪状态(https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html)
我的有状态会话如下所示。
case class UserSession(var visited: collection.Map[String,Boolean],var size:Int
)
我的主要问题似乎是,每当我尝试在映射函数中使用动作时,查找表就会消失。当我将数据帧作为对象中的全局变量读取时,将发生以下代码。这段代码给了我一个空异常。
object StreamState{
val sparkSession = SparkSession.builder()
.master("local")
.appName("StreamState")
.getOrCreate()
val new_data = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.TitleMetaData")))
}
此StreamState对象具有以下功能。 (运行收集时,updateUserStateWithEvent引发错误)
此处发生错误。 def updateUserStateWithEvent(statez:UserSession,event:UserEvent):UserSession = { 导入sparkSession.implicits._ val current_event = lookup_df.filter($“ title” === event.url) //此收集行给我一个错误 val size = current_event.select(“ size”)。as [Int] .collect()(0) val empty_map = MapString,布尔 state.visited = empty_map state.size =大小 州 }
我已经使该辅助函数基本,因此我们不会为逻辑所困扰。状态的大小对象只是通过从开头(从MongoDB)读取的表进行更改。这是我的映射功能。
def updateAcrossEvents(user:Int,events: Iterator[UserEvent],oldState: GroupState[UserSession]):UserSession = {
var state:UserSession = if (oldState.exists) {
println(oldState.get.visited)
oldState.get
}
else {
val empty_map = Map[String,Boolean]()
val empty_session = UserSession(empty_map,0)
empty_session
}
import sparkSession.implicits._
for (event <- events) {
state = updateUserStateWithEvent(state,event)
oldState.update(state)
}
state
}
最后,这是我在流状态对象上的主要功能。 (摘自https://blog.yuvalitzchakov.com/exploring-stateful-streaming-with-spark-structured-streaming/) 您可以在该博客中找到deserializeUserEvent。
def main(args: Array[String]): Unit = {
sparkSession.sparkContext.setLogLevel("WARN")
import sparkSession.implicits._
val userEventsStream = sparkSession.readStream
.format("socket")
.option("host","localhost")
.option("port",12345)
.load()
.as[String]
val finishedUserSessionsStream: Dataset[UserSession] =
userEventsStream
.map(deserializeUserEvent)
.groupByKey(_.id)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents)
finishedUserSessionsStream.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("checkpointLocation","checkpoint")
.option("truncate",false)
.start()
.awaitTermination()
}
这给我中止任务 java.lang.NullPointerException
我想知道为什么会发生此错误以及如何解决。它与映射函数接受事件的迭代器是否有关系,还是全局变量在流传输的上下文中不起作用?
这是您可以获取的完整代码
import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.sql._
import org.apache.spark.sql.{DataFrame,Dataset,Encoder,Encoders,SparkSession}
import user.{UserEvent,UserSession,USession}
import scala.collection.{mutable,_}
import argonaut.Argonaut._
import org.apache.spark.sql.streaming.{GroupState,GroupStateTimeout,OutputMode}
import scala.collection.Map
object StreamStateThree {
val sparkSession = SparkSession.builder()
.master("local")
.appName("StreamStateThree")
//.config("spark.mongodb.input.uri","mongodb://127.0.0.1/blog.articles")
//.config("spark.mongodb.output.uri","mongodb://127.0.0.1/blog.vectors")
.getOrCreate()
sparkSession.sparkContext.setLogLevel("WARN")
import sparkSession.implicits._
val sc = sparkSession.sparkContext
val new_data = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.cleanVectors")))
sparkSession.sparkContext.broadcast(new_data)
def updateUserStateWithEvent(statez:USession,event:UserEvent):USession = {
println("Updating")
val empty_map = Map[String,Boolean]()
val empty_rec: Array[String] = Array("")
val current_event = new_data.filter($"title" === event.url)
val size:Int = current_event.select("size").as[Int].collect()(0)
//.limit(1)
val empty_session = USession(empty_map,-7)
empty_session
}
def updateAcrossEvents(user:Int,oldState: GroupState[USession]):USession = {
var state:USession = if (oldState.exists) {
println("State exists with the following visited")
oldState.get
}
else {
println("State does not exist")
val empty_map = Map[String,Boolean]()
val empty_session = USession(empty_map,-7)
empty_session
}
// we simply specify an old date that we can compare against and
// immediately update based on the values in our data
for (event <- events) {
state = updateUserStateWithEvent(state,event)
oldState.update(state)
}
state
}
def deserializeUserEvent(json: String): UserEvent = {
json.decodeEither[UserEvent] match {
case Right(userEvent) => userEvent
case Left(error) =>
println(s"Failed to parse user event: $error")
UserEvent.empty
}
}
def main(args: Array[String]): Unit = {
//new_data2.show(20,false)
val userEventsStream = sparkSession.readStream
.format("socket")
.option("host","localhost")
.option("port",12346)
.load()
.as[String]
val finishedUserSessionsStream =
userEventsStream
.map(deserializeUserEvent)
.groupByKey(_.id)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents)
finishedUserSessionsStream.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("checkpointLocation","checkpoint")
.option("truncate",false)
.start()
.awaitTermination()
}
}
以下是USession的案例类
package user
import org.apache.spark.sql.Dataset
import scala.collection.Map
case class USession(var visited: collection.Map[String,var size : Int)
事件
package user
import argonaut.Argonaut._
import argonaut.CodecJson
case class UserEvent(id: Int,url: String)
object UserEvent {
implicit def codec: CodecJson[UserEvent] =
casecodec2(UserEvent.apply,UserEvent.unapply)("id","url")
lazy val empty = UserEvent(-1,"")
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)