Flink表,创建表数组类型错误“ValidationException”

问题描述

我创建了一个包含数据类型字段的flink表,错误类型不匹配。 我想知道如何在flink表中创建一个包含数组类型的临时表。

public class FlinkConnectorClickhouse {
    public static void main(String[] args) throws Exception {
        // create environments of both APIs
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // create a DataStream
        DataStream<Order> dataStream = env.fromCollection(Arrays.asList(
                new Order(2L,"pen",1,Arrays.asList("name01","name02","name03"),Arrays.asList(1,2,3)));
        Table inputTable = tableEnv.fromDataStream(dataStreamMap,$("user").as("user_a"),$("product"),$("amount"),$("name_list"),$("id_list"));

        // register the Table object as a view and query it
        tableEnv.createTemporaryView("InputTable",inputTable);

        tableEnv.executesql("CREATE TABLE sink_table (\n" +
                "    `user_a` BIGINT,\n" +
                "    `product` VARCHAR,\n" +
                "    `amount` BIGINT,\n" +
                "    `name_list` ARRAY<STRING>,\n" +
                "    `id_list` ARRAY<INT>,\n" +
                "    PRIMARY KEY (user_a) NOT ENFORCED /* 如果指定 pk,进入 upsert 模式 */\n" +
                ") WITH (\n" +
                ")");

        TableResult resultTable = tableEnv.executesql("INSERT INTO sink_table SELECT user_a,product,amount,name_list,id_list FROM InputTable");
        env.execute();
    }
    public static class Order {
        private Long user;
        private String product;
        private Integer amount;
        private List<String> name_list;
        private List<Integer> id_list;
}

解决方法

            ") WITH (\n" +
            ")");

可能是哪里出错了。你的水槽连接器在哪里?是卡夫卡吗?是蜂巢吗?是带JDBC的普通数据库吗?

您会在此处的“表 API 连接器”下找到一个列表:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/。请确保您遵循正确的 JAR 打包/部署/管理政策。

如果您是初学者,您可能会对与 Flink 捆绑在一起的这个感兴趣:

) WITH (
  'connector' = 'filesystem',-- required: specify the connector
  'path' = 'file:///path/to/whatever',-- required: path to a directory
  'format' = 'csv'                     -- required: file system connector requires to specify a format,)

或者这个:

WITH (
  'connector' = 'print'
);

或者这个:

WITH (
  'connector' = 'blackhole'
);