delta Lake - 在 pyspark 中插入 sql 失败,出现 java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias

问题描述

Dataproc 集群是使用映像 2.0.x 和增量 io 包 io.delta:delta-core_2.12:0.7.0 创建的

Spark 版本为 3.1.1

Spark shell 启动:

pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

执行创建增量表并插入增量 sql 的命令:

spark.sql("""CREATE TABLE IF NOT EXISTS customer(
             c_id Long,c_name String,c_city String
             )
           USING DELTA LOCATION 'gs://edw-bi-dev-dataexports/delta-table-poc/dt_poc/customer'
         """)

spark.sql("INSERT INTO customer VALUES(1,'Shawn','Tx')")

错误

Traceback (most recent call last):
  File "<stdin>",line 1,in <module>
  File "/usr/lib/spark/python/pyspark/sql/session.py",line 719,in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery),self._wrapped)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",line 1305,in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py",line 111,in deco
    return f(*a,**kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",line 328,in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o58.sql.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V
        at org.apache.spark.sql.delta.DeltaAnalysis.$anonfun$normalizeQueryColumns$1(DeltaAnalysis.scala:162)
        at scala.collection.immutable.List.map(List.scala:293)
        at org.apache.spark.sql.delta.DeltaAnalysis.org$apache$spark$sql$delta$DeltaAnalysis$$normalizeQueryColumns(DeltaAnalysis.scala:151)
        at org.apache.spark.sql.delta.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:49)
        at org.apache.spark.sql.delta.DeltaAnalysis$$anonfun$apply$1.applyOrElse(DeltaAnalysis.scala:45)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:45)
        at org.apache.spark.sql.delta.DeltaAnalysis.apply(DeltaAnalysis.scala:40)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:195)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:189)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:154)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:172)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
        at sun.reflect.GeneratedMethodAccessor118.invoke(UnkNown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)

我无法在这里找出问题的根本原因。

解决方法

这是由破坏 Alias 案例类的二进制兼容性的 this change 引起的。对此的修复要么将 Spark 版本降级到 3.0.x,要么等到新的 Delta 版本发布并支持 3.1.x。

附言Delta 中还有其他地方被 Spark 3.1.1 的变化破坏了

更新(2021 年 5 月)1.0.0 版现已与 Spark 3.1 完全兼容

,

看来问题与spark sql目录的设置有关。我尝试了相同的查询,当我删除这个 conf 时它可以工作:

--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

在 dataproc 映像 2.0.1-debian10 上测试:

pyspark --packages io.delta:delta-core_2.12:0.7.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"