问题描述
我已经使用KafkaConnect文件脉冲连接器1.5.3提取了xml文件 然后我想用Spark Streaming读取它以解析/展平它。因为它很嵌套。
我从卡夫卡中读出了 字符串(我使用了消费者控制台读出了该字符串,并在payload
之前放置了Enter / new行以进行说明),如下所示:
{
"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"city"},{"type":"array","items":{"type":"struct","fields":[{"type":"array","field":"unit"},{"type":"string","field":"value"}],"name":"Value"},"name":"ForcedArrayType"},"field":"forcedArrayField"},"field":"lastField"}],"name":"Data","field":"data"}],"optional":true},"payload":{"data":{"city":"someCity","forcedArrayField":[{"value":[{"unit":"unitField1","value":"123"},{"unit":"unitField1","value":"456"}]}],"lastField":"2020-08-02T18:02:00"}}
}
我尝试了数据类型:
StructType schema = new StructType();
schema = schema.add( "schema",StringType,false);
schema = schema.add( "payload",false);
StructType Data = new StructType();
StructType ValueArray = new StructType(new StructField[]{
new StructField("unit",true,Metadata.empty()),new StructField("value",Metadata.empty())
});
StructType ForcedArrayType = new StructType(new StructField[]{
new StructField("valueArray",ValueArray,Metadata.empty())
});
Data = Data.add("city",true);
Data = Data.add("forcedArrayField",ForcedArrayType,true);
Data = Data.add("lastField",true);
StructType Record = new StructType();
Record = Record.add("data",Data,false);
查询,我尝试过
//below worked for payload
Dataset<Row> parsePayload = lines
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"),schema=schema).as("schemaAndPayload"))
.select("schemaAndPayload.payload").as("payload");
System.out.println(parsePayload.isstreaming());
//below makes the output empty:
Dataset<Row> parseValue = parsePayload.select(functions.from_json(functions.col("payload"),Record).as("cols"))
.select(functions.col("cols.data.city"));
//.select(functions.col("cols.*"));
StreamingQuery query = parseValue
.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
query.awaitTermination();
当我提出parsePayload流时,我可以看到数据(仍然是json struture),但是当我想要选择某些/所有字段(如城市)时。它是空的。
Ps。 在控制台上,当我尝试输出“ parsePayload”而不是“ parseValue”时,它显示了一些数据,这使我认为“有效载荷”部分起作用。
|{"data":{"city":"...|
...
解决方法
您的架构定义似乎不完全正确。我正在复制您的问题,并且能够使用以下模式解析JSON
val payloadSchema = new StructType()
.add("data",new StructType()
.add("city",StringType)
.add("forcedArrayField",ArrayType(new StructType()
.add("value",ArrayType(new StructType()
.add("unit",StringType)
.add("value",StringType)))))
.add("lastField",StringType))
当我访问各个字段时,我使用了以下选择:
val parsePayload = df
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"),schema).as("schemaAndPayload"))
.select("schemaAndPayload.payload").as("payload")
.select(functions.from_json(functions.col("payload"),payloadSchema).as("cols"))
.select(col("cols.data.city").as("city"),explode(col("cols.data.forcedArrayField")).as("forcedArrayField"),col("cols.data.lastField").as("lastField"))
.select(col("city"),explode(col("forcedArrayField.value").as("middleFields")),col("lastField"))
这给出了输出
+--------+-----------------+-------------------+
| city| col| lastField|
+--------+-----------------+-------------------+
|someCity|[unitField1,123]|2020-08-02T18:02:00|
|someCity|[unitField1,456]|2020-08-02T18:02:00|
+--------+-----------------+-------------------+
,
您的架构定义是错误的。
payload
和schema
可能不是列/字段
将其读取为静态Json(Spark.read.json)并获取架构,然后在结构化流中使用它。