KSQL嵌套json需要将密钥保存在多行中

问题描述

以下是指标列中的示例数据:

{
   "data": "[{'val': '[{'TS': '52343223486','val': '[{ 'param': 'abc'}]'},{'TS': '9876543234','val': '[{ 'param1': 'xyz'}]'}]}]"
}

我需要在多行中填充每条记录。

结果

指标

"{"TS": "52343223486","val": "{ "param": "abc"}"}"
"{"TS": "98765432434","val": "{ "param1": "xyz"}"}"

是否有查询可以实现?

解决方法

根据this article,您可以拆分指标,例如:

CREATE STREAM my_stream (
  data struct<
    val array<struct<
      ts varchar,val varchar
    >>
  >
) WITH (KAFKA_TOPIC='my_topic',VALUE_FORMAT='JSON');

SELECT EXPLODE(DATA->VAL) FROM my_stream EMIT CHANGES;

因此,您可以创建新的流,例如:

CREATE STREAM my_metrics AS SELECT EXPLODE(DATA->VAL) EMIT FROM my_stream EMIT CHANGES;

P.S。 您输入的必须是有效的 JSON ,例如:

{"data": {"val": [{"TS": "52343223486","val": {"param": "abc"}},{"TS": "9876543234","val": {"param1": "xyz"}}]}}