问题描述
{
"schema": {
"type": "string","optional": true
},"payload": “CustomerData{version='1',customerId=‘76813432’,phone=‘76813432’}”
}
,我想用customerId和phone创建一个流,但是我不确定如何根据嵌套的json对象定义该流。 (已编辑)
CREATE STREAM customer (
payload.version VARCHAR,payload.customerId VARCHAR,payload.phone VARCHAR
) WITH (
KAFKA_TOPIC='customers',VALUE_FORMAT='JSON'
);
会是那样吗?如何在定义流字段时取消引用嵌套对象?
实际上,上述内容不适用于字段定义:
Caused by: line 2:12:
extraneous input '.' expecting {'EMIT','CHANGES','INTEGER','DATE','TIME','TIMESTAMP','INTERVAL','YEAR','MONTH','DAY',
解决方法
应用功能extractjsonfield
您可以使用一个名为extractjsonfield的ksqlDB函数。
首先,您需要提取 schema 和 payload 字段:
CREATE STREAM customer (
schema VARCHAR,payload VARCHAR
) WITH (
KAFKA_TOPIC='customers',VALUE_FORMAT='JSON'
);
然后您可以选择json中的嵌套字段:
SELECT EXTRACTJSONFIELD(payload,'$.version') AS version FROM customer;
但是,看来您的有效载荷数据没有有效的JSON格式。
应用STRUCT模式
如果您的整个有效载荷都编码为JSON字符串,则意味着您的数据如下所示:
{
"schema": {
"type": "string","optional": true
},"payload": {
"version"="1","customerId"="76813432","phone"="76813432"
}
}
您可以如下定义STRUCT:
CREATE STREAM customer (
schema STRUCT<
type VARCHAR,optional BOOLEAN>,payload STRUCT<
version VARCHAR,customerId VARCHAR,phone VARCHAR>
)
WITH (
KAFKA_TOPIC='customers',VALUE_FORMAT='JSON'
);
最后可以像这样引用各个字段:
CREATE STREAM customer_analysis AS
SELECT
payload->version as VERSION,payload->customerId as CUSTOMER_ID,payload->phone as PHONE
FROM customer
EMIT CHANGES;