结构化流连续触发器在 Spark 3 上失败,但适用于 Spark 2.4 - 一个错误?

问题描述

以 Kinesis 为源完成了 Spark 结构化流的 POC,它与 Spark 2.4.5 一起工作正常,我当时已在本地安装。

然后在某些时候我不得不升级到 Spark 3.0.0(也用 3.0.1 测试过),这是它开始失败并出现此错误的时候:

Continuous processing does not support Streamingrelation operations.;;
kinesis
org.apache.spark.sql.AnalysisException: Continuous processing does not support Streamingrelation operations.;;
kinesis
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForContinuous$1(UnsupportedOperationChecker.scala:408)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForContinuous$1$adapted(UnsupportedOperationChecker.scala:390)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForContinuous(UnsupportedOperationChecker.scala:390)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createquery(StreamingQueryManager.scala:290)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)
    at com.niceic.dl.kinesispuller.RunnerErr$.main(RunnerErrorSpark3.scala:50)
    at com.niceic.dl.kinesispuller.Spark3ErrorTest.$anonfun$new$2(test.scala:32)
    at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190)
    at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
    at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
    at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1563)
    at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188)
    at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200)
    at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182)
    at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1563)
    at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233)
    at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
    at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233)
    at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232)
    at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
    at org.scalatest.Suite.run(Suite.scala:1112)
    at org.scalatest.Suite.run$(Suite.scala:1094)
    at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
    at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
    at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237)
    at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236)
    at com.niceic.dl.kinesispuller.Spark3ErrorTest.org$scalatest$BeforeAndAfterallConfigMap$$super$run(test.scala:24)
    at org.scalatest.BeforeAndAfterallConfigMap.liftedTree1$1(BeforeAndAfterallConfigMap.scala:248)
    at org.scalatest.BeforeAndAfterallConfigMap.run(BeforeAndAfterallConfigMap.scala:245)
    at org.scalatest.BeforeAndAfterallConfigMap.run$(BeforeAndAfterallConfigMap.scala:242)
    at com.niceic.dl.kinesispuller.Spark3ErrorTest.run(test.scala:24)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
    at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
    at org.scalatest.tools.Runner$.$anonfun$runoptionallyWithPassFailReporter$24(Runner.scala:993)
    at org.scalatest.tools.Runner$.$anonfun$runoptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
    at org.scalatest.tools.Runner$.withClassLoaderAnddispatchReporter(Runner.scala:1480)
    at org.scalatest.tools.Runner$.runoptionallyWithPassFailReporter(Runner.scala:971)
    at org.scalatest.tools.Runner$.run(Runner.scala:798)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:41)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

我在 IntelliJ 中运行我的代码(用自制软件安装在本地的 Spark):

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object RunnerErr {
  def main(): Unit = {
    val spark = SparkSession
      .builder
      .appName("Kinesisspark3Error")
      .master("local[*]")
      .getorCreate()

    val kinesissqlOptions = Map("streamName" -> "shards2","endpointUrl" -> "https://kinesis.us-west-2.amazonaws.com","awsAccessKeyId" -> "<awsAccessKeyId>","awsSecretKey" -> "<awsSecretKey>","startingPosition" -> "latest"
    )

    val df = spark.readStream
      .format("kinesis")
      .options(kinesissqlOptions)
      .load

    df.writeStream
      .outputMode("append")
      .format("console")
      .trigger(Trigger.Continuous("2 seconds")) //<-works on 2.4.5 only
//      .trigger(Trigger.ProcessingTime("2 seconds")) //<-works on both
      .start()
      .awaitTermination()
  }
}

在 2.4 和 3.0 版本之间的 Spark 文档中,我没有看到与连续触发器相关的任何更改。错误来自 UnsupportedOperationChecker.scala:

  def checkForContinuous(plan: LogicalPlan,outputMode: OutputMode): Unit = {
    checkForStreaming(plan,outputMode)

    plan.foreachUp { implicit subPlan =>
      subPlan match {
        case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
              _: DeserializetoObject | _: SerializefromObject | _: SubqueryAlias |
              _: TypedFilter) =>
        case node if node.nodeName == "StreamingrelationV2" =>
        case Repartition(1,false,_) =>
        case node: Aggregate =>
          val aboveSinglePartitionCoalesce = node.find {
            case Repartition(1,_) => true
            case _ => false
          }.isDefined

          if (!aboveSinglePartitionCoalesce) {
            throwError(s"In continuous processing mode,coalesce(1) must be called before " +
              s"aggregate operation ${node.nodeName}.")
          }
        case node =>
          throwError(s"Continuous processing does not support ${node.nodeName} operations.")
      }

代码段调试器的最后一个 case node显示 node.sourceName = "kinesis"。同样,在 Spark 2.4.5 上工作,在 3.0.0 和 3.0.1 上失败。

使用的 Kinesis Spark 连接器:

  • 对于 Spark 2.4.5 是 "com.qubole.spark" %% "spark-sql-kinesis" % "1.2.0_spark-2.4"
  • 对于 Spark 3+ 是 "com.qubole.spark" %% "spark-sql-kinesis" % "1.2.0_spark-3.0"

在这一点上,我认为这是一个错误 - 你怎么看?任何输入将不胜感激 - 谢谢!

Update1:​​在 Spark 源代码中发现 DataStreamReader 加载方法发生了变化,对于 Kinesis 源,它在 Spark 3 中返回 Streamingrelation,在 Spark 2.4.5 中返回 StreamingrelationV2。稍后在数据帧计划上调用方法 checkForContinuous 可以处理 StreamingrelationV2 并抛出 Streamingrelation。

更新 2:Qubole 回答:“Spark 3 对数据源 V2 API 进行了重大更改,这是连续流所必需的。我们不得不删除连续流的代码支持 Spark 3 中的连接器。#92

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)