Flink 表异常:窗口聚合只能定义在一个时间属性列上,但是遇到了TIMESTAMP(6)

问题描述

我使用的是 flink 1.12.0。尝试将数据流转换为表 A 并在表 A 上运行 sql 查询以在窗口上聚合,如下所示。我使用 f2 列作为其时间戳数据类型字段。

    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,fsSettings);

    Properties props = new Properties();

    props.setProperty("bootstrap.servers",BOOTSTRAP_SERVER);
    props.setProperty("schema.registry.url",xxx);
    props.setProperty("group.id","test");
    props.setProperty("value.deserializer",KafkaAvroDeserializer.class.getName());

    props.put("client.id","flink-kafka-example");

    FlinkKafkaConsumer<PlaybackListening> kafkaConsumer = new FlinkKafkaConsumer<>(
            "test-topic",ConfluentRegistryAvroDeserializationSchema.forSpecific(
                    Avrotest.class,prodSchemaRegistryURL),props);

    DataStreamSource<Avrotest> stream =
            env.addSource(kafkaConsumer);
    Table tableA = tEnv.fromDataStream(stream,$("f0"),$("f1"),$("f2"));
    Table result =
            tEnv.sqlQuery("SELECT f0,sum(f1),f2 FROM "
                    + tableA + " GROUP BY TUMBLE(f2,INTERVAL '1' HOUR),f1" );

    
    tEnv.toAppendStream(result,user.class).print();

    env.execute("Flink kafka test");
}

当我执行上面的代码时,我得到

线程“main”org.apache.flink.table.api.TableException 中的异常:窗口聚合只能在时间属性列上定义,但遇到 TIMESTAMP(6)。 在 org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:50) 在 org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:81) 在 org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) 在 org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) 在 org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) 在 org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) 在 org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)

解决方法

为了使用表 API 对数据流执行事件时间窗口化,您需要先分配时间戳和水印。您应该在调用 fromDataStream 之前执行此操作。

对于 Kafka,通常最好直接在 assignTimestampsAndWatermarks 上调用 FlinkKafkaConsumer。有关详细信息,请参阅 watermark docskafka connector docsFlink SQL docs

,

3 个步骤:

  1. 首先分配 assignTimestampsAndWatermarks

您有多种类型的策略。

例如:

 WatermarkStrategy<Row> customTime = WatermarkStrategy
                .<Row>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event,timestamp) -> (long) event.getField("f2"));
  1. 在您的源代码中分配您在第 1 步中声明的内容:
env.addSource().assignTimestampsAndWatermarks(customTime)
  1. 声明表,并为时间戳字段设置行时间:
Table tableA = tEnv.fromDataStream(stream,$("f0"),$("f1"),$("f2").rowtime());