问题描述
我正在使用Spark结构化流技术来从kafka流数据,这为我提供了具有以下架构的数据框
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int
Value Colum以二进制格式出现在这里,但是它实际上是具有struct类型的json字符串,并且要求读取json struct并屏蔽其中的一些字段并写入数据。
解决方法
您可以按照Structured Streaming + Kafka Integration Guide中给出的准则来了解如何将二进制值转换为字符串值。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1")
.load()
df.selectExpr("CAST(value AS STRING)")
.as[String]
然后,您可以根据实际的json结构定义模式,例如:
val schema: StructType = new StructType()
.add("field1",StringType)
.add("field2",ArrayType(new StructType()
.add("f2",StringType)
.add("f2",DoubleType)
))
然后使用from_json
函数将允许您处理JSON字符串中的数据,请参考documentation,例如:
df.selectExpr("CAST(value AS STRING)")
.select(from_json('json,schema).as("data"))
然后您可以通过使用withColumn
和drop
等结构化API替换列来开始屏蔽。
如果您不想定义整个架构,则可以考虑使用get_json_object
。