什么时候在 Apache Spark StreamingQueryListeners 中触发 onQueryTerminated?

问题描述

我正在开发一个自定义的 StreamingQueryListener,我想在测试中触发它的 onQueryTerminated 方法

这是我尝试实现的:

import org.apache.spark.sql.{ sqlContext,SparkSession }
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{ col,to_date }
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.scalatest.flatspec.AnyFlatSpec

class MyListener extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit       = {}
  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit     = {}
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = println(event.exception)
}

class ListenerSpec extends AnyFlatSpec {

  it should "trigger onQueryTerminated" in {
    val spark = SparkSession.builder().master("local[*]").getorCreate()
    spark.streams.addListener(new MyListener())
    implicit val sqlContext: sqlContext = spark.sqlContext

    import spark.implicits._

    val stream = MemoryStream[Int]
    stream.addData(Seq(1,3,4))

    val query = stream
      .toDF()
      .withColumn("columnDoesntExist",to_date(col("names")))
      .writeStream
      .format("console")
      .start()

    query.awaitTermination()
  }
}

但是,这不起作用,因为它引发了 AnalysisException 但 onQueryTerminated 方法不是由流式查询的终止触发的。

在什么情况下触发该方法并且 event.exception 是 Some(exception)?

更新

以下代码成功触发了onQueryTerminated的执行:

val exceptionUdf = udf(() => throw new Exception())

val query = stream
      .toDF()
      .withColumn("exception",exceptionUdf())
      .writeStream
      .format("console")
      .start()

有关原因的解释,请参阅已接受的答案。

解决方法

根据“Stream Processing with Apache Spark”(由 O'Reilly 出版)一书,onQueryTerminated 方法得到

"在流查询停止时调用。 event 包含与开始事件相关的 idrunId 字段。它还提供了一个 {如果查询因错误而失败,则包含 exception 的 {1}} 字段。”

当您收到 exception 时,您的查询甚至还没有开始。它只到了 Catalyst 优化器的四个阶段中的第一个阶段,也就是“分析”,还没有转化为可运行的代码:

enter image description here

有关 Catalyst Optimizer 的更多详细信息。

AnalysisException 仅表示与目录相关的代码中存在问题,这正是您打算执行的操作:引用不存在的列(在目录中)。

如果您想运行 AnalysisException 方法的执行,您需要实现一个工作代码,但在它已经运行时失败(例如,提供错误的数据输入类型)。