我正在关注Flink:
Monitoring the Wikipedia Edit Stream的快速启动示例.
这个例子是用Java编写的,我在Scala中实现它,如下所示:
/** * Wikipedia Edit Monitoring */ object WikipediaEditMonitoring { def main(args: Array[String]) { // set up the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource) val result = edits.keyBy( _.getUser ) .timeWindow(Time.seconds(5)) .fold(("",0L)) { (acc: (String,Long),event: WikipediaEditEvent) => { (event.getUser,acc._2 + event.getByteDiff) } } result.print // execute program env.execute("Wikipedia Edit Monitoring") } }
但是,Flink中的fold函数已被弃用,建议使用聚合函数.
但是我没有找到关于如何将弃用的折叠转换为聚合的示例或教程.
知道怎么做吗?可能不仅仅是通过应用聚合.
UPDATE
/** * Wikipedia Edit Monitoring */ object WikipediaEditMonitoring { def main(args: Array[String]) { // set up the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource) val result = edits .map( e => UserWithEdits(e.getUser,e.getByteDiff) ) .keyBy( "user" ) .timeWindow(Time.seconds(5)) .sum("edits") result.print // execute program env.execute("Wikipedia Edit Monitoring") } /** Data type for words with count */ case class UserWithEdits(user: String,edits: Long) }
我也想知道如何使用自定义AggregateFunction
进行实现.
UPDATE
我按照此文档:AggregateFunction,但有以下问题:
在版本1.3的接口AggregateFunction
的源代码中,您将看到add确实返回void:
void add(IN value,ACC accumulator);
但是对于版本1.4 AggregateFunction
,正在返回:
ACC add(IN value,ACC accumulator);
我该怎么处理?
我使用的Flink版本是1.3.2,此版本的文档没有AggregateFunction,但是还没有版本1.4.
解决方法
您将找到AggregateFunction
in the Flink 1.4 docs的一些文档,包括一个示例.
1.3.2中包含的版本仅限于与可变累加器类型一起使用,其中add操作修改累加器.这已经是fixed for Flink 1.4,但还没有发布.