使用Apache Flink SQL从Kafka消息获取嵌套字段

问题描述

我正在尝试使用Apache Flink 1.11创建一个源表,在这里我可以访问JSON消息中的嵌套属性。我可以从根属性删除值,但不确定如何访问嵌套对象。

documentation建议它应该是MAP类型,但是当我设置它时,出现以下错误

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.sqlIdentifier: MAP

这是我的sql

        CREATE TABLE input(
            id VARCHAR,title VARCHAR,properties MAP
        ) WITH (
            'connector' = 'kafka-0.11','topic' = 'my-topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'python-test','format' = 'json'
        )

我的JSON看起来像这样:

{
  "id": "message-1","title": "Some Title","properties": {
    "foo": "bar"
  }
}

解决方法

您可以使用ROW在JSON消息中提取嵌套字段。您的DDL语句如下所示:

CREATE TABLE input(
             id VARCHAR,title VARCHAR,properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11','topic' = 'my-topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'python-test','format' = 'json'
        );
,

你也可以试试

CREATE TABLE input(
            id VARCHAR,properties MAP<STRING,STRING>
        ) WITH (
            'connector' = 'kafka-0.11','format' = 'json'
        )

唯一的区别是:MAP<STRING,STRING> vs MAP