(spark-xml) 使用 from_xml 函数解析 xml 列时只接收空值

问题描述

我正在尝试使用 spark-xml 解析一个非常简单的 XML 字符串列,但即使正确填充了 XML,我也只能接收到 null 值。

我用来解析 xml 的 XSD 是:

<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="note">
        <xs:complexType>
            <xs:sequence>
                <xs:element type="xs:string" name="from"/>
                <xs:element type="xs:string" name="to"/>
                <xs:element type="xs:string" name="message"/>
            </xs:sequence>
        </xs:complexType>
    </xs:element>
</xs:schema>

虽然列中的 XML 以字符串形式显示如下,但每个标签都正确填充:

<?xml version="1.0" encoding="UTF-8"?>
<note>
    <from>Jani</from>
    <to>Tove</to>
    <message>Remember me this weekend</message>
</note>

我用 Scala 编写的 Spark 代码是这样的:

    // XML Schema
    val schema = XSDToSchema.read("<the XSD as string>")
    
    // Spark Structured Streaming (N.b. the column value contains the xml as string)
    import spark.implicits._
    var df = initSource(spark)
      .withColumn("parsed",from_xml($"value",schema,Map(
              "mode" -> "FAILFAST","nullValue"-> "","rowTag" -> "note","ignoreSurroundingSpaces" -> "true"
          )
      ))
      .select($"value",$"parsed.note.from",$"parsed.note.to",$"parsed.note.message")
      .writeStream
      .format("console")
      // .option("mode","FAILFAST")
      // .option("nullValue","")
      // .option("rowTag","note")
      // .option("ignoreSurroundingSpaces","true")
      .outputMode("append")
      .start()
      .awaitTermination(30*1000)

打印此数据帧的模式(在 select 语句之前)将给出预期的模式

root
 |-- value: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- note: struct (nullable = false)
 |    |    |-- from: string (nullable = false)
 |    |    |-- to: string (nullable = false)
 |    |    |-- message: string (nullable = false)

但是在控制台中打印结果时,我得到的只是 null 值,如下所示:

....
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+----+----+-------+
|               value|from|  to|message|
+--------------------+----+----+-------+
|<?xml version="1....|null|null|   null|
|<?xml version="1....|null|null|   null|
|<?xml version="1....|null|null|   null|
....

我不认为它是相关的,但这个 xml 专栏的来源来自阅读定义如下的 Kafka 主题:

    def initSource(spark: SparkSession) : DataFrame = {
        spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers","localhost:9092")
          .option("startingoffsets","earliest")
          .option("subscribe","my-test-topic")
          .load()
          .selectExpr("CAST(value AS STRING)")
    }

有没有其他人遇到过这个问题并解决了它?我的选择不多了,我真的很感激这方面的提示:)

我使用的 spark-xml 版本是最新的一个 atm,0.12.0 和 spark 3.1.1

更新

我在调用 writeStream 后错误地传递了 spark-xml 选项,而是需要将它们作为 from_xml 函数的第三个参数传递。我仍然只得到空值...

解决方法

最后让我大开眼界的是阅读 spark-xml documentation 中提到的部分:

XSD 文件的路径,用于单独验证每一行的 XML

这意味着模式匹配是通过每一行而不是整个 XML 完成的,在这种情况下,我的示例的模式需要类似于以下内容:

val schema = StructType(Array(
        StructField("from",StringType,nullable = true),StructField("to",StructField("message",nullable = true)))

也可以使用 XSD 来完成:

<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element type="xs:string" name="from"/>
    <xs:element type="xs:string" name="to"/>
    <xs:element type="xs:string" name="message"/>
</xs:schema>

这两种声明模式的方法对我有用。希望对以后的人有所帮助。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...