使用ClickHouse消费来自Kafka的嵌套JSON消息

问题描述

Clickhouse如果是纯JSON文档,肯定可以从Kafka读取JSON消息。

我们在Clickhouse中用kafka_format = 'JSONEachRow'表示。

这是我们当前使用它的方式:

CREATE TABLE topic1_kafka
(
    ts Int64,event String,title String,msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1test.intra:9092,kafka2test.intra:9092,kafka3test.intra:9092',kafka_topic_list = 'topic1',kafka_num_consumers = 1,kafka_group_name = 'ch1',kafka_format = 'JSONEachRow'

这很好,只要生产者向topic1_kafka发送平面JSON。但并非所有生产者都发送平面JSON,大多数应用程序会生成嵌套的JSON文档,如下所示:

{
  "ts": 1598033988,"deviceid": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663","location": [39.920515,32.853708],"stats": {
    "temp": 71.2,"total_memory": 32,"used_memory": 21.2
  }
}

很遗憾,上面的JSON文档与JSONEachRow不兼容,因此ClickHouse无法将JSON文档中的字段映射到表中的列。

有什么办法做这种映射吗?

编辑:我们希望将嵌套的json映射到如下所示的平面表:

CREATE TABLE topic1
(
    ts Int64,deviceid String,location_1 Float64,location_2 Float64,stats_temp Float64,stats_total_memory Float64,stats_used_memory Float64
) ENGINE = MergeTree()

解决方法

似乎曾经的方法是获取“原始”数据作为String,然后在Consumer Materialized View中使用JSON functions处理每一行。

WITH '{"ts": 1598033988,"deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663","location": [39.920515,32.853708],"stats": { "temp": 71.2,"total_memory": 32,"used_memory": 21.2 }}' AS raw
SELECT 
  JSONExtractUInt(raw,'ts') AS ts,JSONExtractString(raw,'deviceId') AS deviceId,arrayMap(x -> toFloat32(x),JSONExtractArrayRaw(raw,'location')) AS location,JSONExtract(raw,'stats','Tuple(temp Float64,total_memory Float64,used_memory Float64)') AS stats,stats.1 AS temp,stats.2 AS total_memory,stats.3 AS used_memory;

/*
┌─────────ts─┬─deviceId─────────────────────────────┬─location──────────────┬─stats────────────────────────┬─temp─┬─total_memory─┬────────used_memory─┐
│ 1598033988 │ cf060111-dbe6-4aa8-a2d0-d5aa17f45663 │ [39.920513,32.853706] │ (71.2,32,21.200000000000003) │ 71.2 │           32 │ 21.200000000000003 │
└────────────┴──────────────────────────────────────┴───────────────────────┴──────────────────────────────┴──────┴──────────────┴────────────────────┘
*/

备注:对于带浮点数的数字,应使用 Float64 而不是 Float32 (请参阅相关的CH Issue 13962)。


使用标准数据类型需要更改JSON模式:

  1. stats 表示为Tuple
CREATE TABLE test_tuple_field
(
    ts Int64,deviceId String,location Array(Float32),stats Tuple(Float32,Float32,Float32)
) ENGINE = MergeTree()
ORDER BY ts;


INSERT INTO test_tuple_field FORMAT JSONEachRow 
{ "ts": 1598033988,"stats": [71.2,21.2]};
  1. stats 表示为Nested Structure
CREATE TABLE test_nested_field
(
    ts Int64,stats Nested (temp Float32,total_memory Float32,used_memory Float32)
) ENGINE = MergeTree()
ORDER BY ts;


SET input_format_import_nested_json=1;
INSERT INTO test_nested_field FORMAT JSONEachRow 
{ "ts": 1598033988,"stats": { "temp": [71.2],"total_memory": [32],"used_memory": [21.2] }};

请参阅相关答案ClickHouse JSON parse exception: Cannot parse input: expected ',' before