问题描述
{
"header": {
"p1": "abcd","p2": 5170,},"data": [{
"name": "propoertyName","value": 10,}]
}
下面是我写的scala代码-
val incomingStream = spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
val messages =incomingStream.withColumn("Body",$"body".cast(StringType)).select("Body")
val query2 = messages.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation","_")
.start("/delta/data/")
query2.awaitTermination()
使用此代码,字符串将保存在三角洲湖中。
当我使用sql语法查询它时-使用-我可以看到字符串。因此,以下查询效果很好。
SELECT star FROM delta./delta/data/
但是我也想基于内部参数进行查询-这不起作用。我如何将JSON存储到三角洲湖,因为它是对如下所示的参数进行查询。所以我想编写以下查询,这给了我错误。
SELECT Body.header.P1 FROM delta./delta/data/
在文档或Twitter流的一些示例中,只有字符串示例。我没有找到json示例和相关的操作。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)