Azure事件中心-Scala笔记本中的JSON数据处理-数据块

问题描述

{
    "header": {
        "p1": "abcd","p2": 5170,},"data": [{
            "name": "propoertyName","value": 10,}]
}

下面是我写的scala代码-

val incomingStream  = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

val messages =incomingStream.withColumn("Body",$"body".cast(StringType)).select("Body")

val query2  = messages.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation","_")
  .start("/delta/data/")

 query2.awaitTermination()

使用此代码,字符串将保存在三角洲湖中。

当我使用sql语法查询它时-使用-我可以看到字符串。因此,以下查询效果很好。 SELECT star FROM delta./delta/data/

但是我也想基于内部参数进行查询-这不起作用。我如何将JSON存储到三角洲湖,因为它是对如下所示的参数进行查询。所以我想编写以下查询,这给了我错误

SELECT Body.header.P1 FROM delta./delta/data/

在文档或Twitter流的一些示例中,只有字符串示例。我没有找到json示例和相关的操作。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)