无法将解析的 xml 数据插入到具有更改输入架构的 spark 增量表中

问题描述

我正在尝试将数据帧中的数据插入到增量表中。最初,我正在解析基于目标模式的 xml 文件并将结果保存到数据帧中。下面是用于解析的代码。

def parseAsset (nodeSeqXml: scala.xml.NodeSeq) : Seq[String] = {
  //convert nodeseq to xml
  
 
  
  Seq(  (nodeSeqXml \ "AMS").\@("Pro"),(nodeSeqXml \ "AMS").\@("Prod"),(nodeSeqXml \ "AMS").\@("Asset"),(nodeSeqXml \ "AMS").\@("Descrn"),(nodeSeqXml \ "AMS").\@("Creation_Dt"),(nodeSeqXml \ "AMS").\@("Provider"),(nodeSeqXml \ "AMS").\@("AssetD"),(nodeSeqXml \ "AMS").\@("lass"),(nodeSeqXml \ "AMS").\@("hyu"),((nodeSeqXml \ "App_Data" ).map(d => ((d \\ "@Name").text + "@-" + (d \\ "@Value").text))).mkString("!-"))
}


val AssetXml = XML.loadFile("filepath/filename")
 
val metadataNodeSeqLst = (AssetXml \\ "Metadata")
var records: Seq[String] = Seq()
 //for each of Metadata tag
metadataNodeSeqLst.foreach(nodeSeqXml => {
  records = records :+ parseAsset(nodeSeqXml).mkString("%-")
})


val AssetDF = records.toDF("ETY_Asset")

在这一步之后,我将拆分列并分解数组列,最后将数据保存到数据帧中,然后我使用下面的方法将此数据插入到增量表中。

outputparse.write.format("delta").mode("append").option("mergeSchema","true").insertInto("targettable")

如果源文件的列数与目标文件的列数相同,这可以正常工作。但在这种情况下,会有不同模式的不同文件将作为输入传递给解析代码。例如,目标架构有 77 列,如果传入文件有 65 列,并且在将数据插入增量表时,我会收到以下错误。

org.apache.spark.sql.AnalysisException: Cannot write to 'target',not enough data columns; target table has 74 column(s) but the inserted data has 65 column(s);

像这样,我得到具有不同输入模式的文件,但我的目标模式是不变的。所以,基本上我需要将 Null 传递给缺失的字段。我知道在将数据写入数据帧之前,我需要在解析代码中进行架构比较。能否请您告诉我如何实现这一点以及在解析代码中的何处合并此逻辑。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...