Flink TableAPI中的异常

问题描述

我正在尝试在简单的Flink作业下运行以使用TableAPI计数单词。使用DataStream API读取数据流,并使用StreamTableEnvironment API创建Table环境。我在例外之下。有人可以帮我代码有什么问题吗?我正在使用Flink 1.8版本。

例外:

**Exception in thread "main" org.apache.flink.table.api.TableException: Only the first field can reference an atomic type.**
    at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1117)
    at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1112)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
    at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:1112)
    at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:546)
    at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:91)
    at Udemy_Course.TableAPIExample.CountWordExample.main(CountWordExample.java:30)

代码

public class CountWordExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment streamTableEnvironment=StreamTableEnvironment.create(environment);


        DataStream<WC> streamOfWords =
                environment.fromElements(
                        new WC("Hello",1L),new WC("Howdy",new WC("Hello",1L));

        Table t1 = streamTableEnvironment.fromDataStream(streamOfWords,"word,count");

        Table result = streamTableEnvironment.sqlQuery("select word,count(word) as wordcount 
        from " + t1 + " group by word");


        streamTableEnvironment.toRetractStream(result,CountWordExample.class ).print();

        environment.execute();
    }

    public static class WC {

        private String word;
        private Long count;

        public WC() {}

        public WC(String word,Long count) {
            this.word=word;
            this.count=count;
        }
    }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)