如何在scala中使用flink fold函数

这是一个非常有效的尝试使用Flink折叠与 scala匿名函数

val myFoldFunction = (x: Double,t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0,myFoldFunction : Function2[Double,(Double,String),Double])

它汇编得很好,但在执行时,我得到了“类型擦除问题”(见下文).在Java中这样做很好,但当然更冗长.我喜欢简洁明了的lambda.我怎么能在scala中做到这一点?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.Typeinformation,scala.reflect.classtag)' Could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

解决方法

您遇到的问题是Flink [1]中的错误.问题源于Flink的TypeExtractor以及Scala DataStream API在Java实现之上的实现方式. TypeExtractor无法为Scala类型生成Typeinformation,因此返回MissingTypeinformation.创建StreamFold运算符后,手动设置此缺失类型信息.但是,StreamFold运算符的实现方式是它不接受MissingTypeinformation,因此在设置正确的类型信息之前失败.

我已经打开了一个拉取请求[2]来解决这个问题.它应该在接下来的两天内合并.通过使用最新的0.10快照版本,您的问题应该得到解决.

> [1] https://issues.apache.org/jira/browse/FLINK-2631
> [2] https://github.com/apache/flink/pull/1101

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...