如何从嵌套的JSON对象创建KSQLdb流字段 应用功能extractjsonfield 应用STRUCT模式

问题描述

我有一个主题,我正在通过以下格式发送json:

 {
  "schema": {
   "type": "string","optional": true
  },"payload": “CustomerData{version='1',customerId=‘76813432’,phone=‘76813432’}”
} 

,我想用customerId和phone创建一个流,但是我不确定如何根据嵌套的json对象定义该流。 (已编辑)

CREATE  STREAM customer (
    payload.version VARCHAR,payload.customerId VARCHAR,payload.phone VARCHAR
  ) WITH (
    KAFKA_TOPIC='customers',VALUE_FORMAT='JSON'
  );    

会是那样吗?如何在定义流字段时取消引用嵌套对象?

实际上,上述内容不适用于字段定义:

Caused by: line 2:12: 
extraneous input '.' expecting {'EMIT','CHANGES','INTEGER','DATE','TIME','TIMESTAMP','INTERVAL','YEAR','MONTH','DAY',

解决方法

应用功能extractjsonfield

您可以使用一个名为extractjsonfield的ksqlDB函数。

首先,您需要提取 schema payload 字段:

CREATE STREAM customer (
  schema VARCHAR,payload VARCHAR
) WITH (
    KAFKA_TOPIC='customers',VALUE_FORMAT='JSON'
); 

然后您可以选择json中的嵌套字段:

SELECT EXTRACTJSONFIELD(payload,'$.version') AS version FROM customer;

但是,看来您的有效载荷数据没有有效的JSON格式


应用STRUCT模式

如果您的整个有效载荷都编码为JSON字符串,则意味着您的数据如下所示:

{
  "schema": {
   "type": "string","optional": true
  },"payload": {
    "version"="1","customerId"="76813432","phone"="76813432"
  }
} 

您可以如下定义STRUCT:

CREATE STREAM customer (
  schema STRUCT<
    type VARCHAR,optional BOOLEAN>,payload STRUCT<
    version VARCHAR,customerId VARCHAR,phone VARCHAR>
) 
WITH (
    KAFKA_TOPIC='customers',VALUE_FORMAT='JSON'
);

最后可以像这样引用各个字段:

CREATE STREAM customer_analysis AS
SELECT
  payload->version as VERSION,payload->customerId as CUSTOMER_ID,payload->phone as PHONE
FROM customer
EMIT CHANGES;