问题描述
我正在尝试在简单的Flink作业下运行以使用TableAPI计数单词。使用DataStream API读取数据流,并使用StreamTableEnvironment API创建Table环境。我在例外之下。有人可以帮我代码有什么问题吗?我正在使用Flink 1.8版本。
例外:
**Exception in thread "main" org.apache.flink.table.api.TableException: Only the first field can reference an atomic type.**
at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1117)
at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1112)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:1112)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:546)
at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:91)
at Udemy_Course.TableAPIExample.CountWordExample.main(CountWordExample.java:30)
代码
public class CountWordExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment=StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnvironment=StreamTableEnvironment.create(environment);
DataStream<WC> streamOfWords =
environment.fromElements(
new WC("Hello",1L),new WC("Howdy",new WC("Hello",1L));
Table t1 = streamTableEnvironment.fromDataStream(streamOfWords,"word,count");
Table result = streamTableEnvironment.sqlQuery("select word,count(word) as wordcount
from " + t1 + " group by word");
streamTableEnvironment.toRetractStream(result,CountWordExample.class ).print();
environment.execute();
}
public static class WC {
private String word;
private Long count;
public WC() {}
public WC(String word,Long count) {
this.word=word;
this.count=count;
}
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)