scala – Flink:如何将弃用的折叠转换为聚合?

我正在关注Flink: Monitoring the Wikipedia Edit Stream快速启动示例.

这个例子是用Java编写的,我在Scala中实现它,如下所示:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits.keyBy( _.getUser )
      .timeWindow(Time.seconds(5))
      .fold(("",0L)) {
        (acc: (String,Long),event: WikipediaEditEvent) => {
          (event.getUser,acc._2 + event.getByteDiff)
        }
      }

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }
}

但是,Flink中的fold函数已被弃用,建议使用聚合函数.

enter image description here

但是我没有找到关于如何将弃用的折叠转换为聚合的示例或教程.

知道怎么做吗?可能不仅仅是通过应用聚合.

UPDATE

我有一个实现如下:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits
      .map( e => UserWithEdits(e.getUser,e.getByteDiff) )
      .keyBy( "user" )
      .timeWindow(Time.seconds(5))
      .sum("edits")

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }

  /** Data type for words with count */
  case class UserWithEdits(user: String,edits: Long)
}

我也想知道如何使用自定义AggregateFunction进行实现.

UPDATE

我按照此文档:AggregateFunction,但有以下问题:

在版本1.3的接口AggregateFunction的源代码中,您将看到add确实返回void:

void add(IN value,ACC accumulator);

但是对于版本1.4 AggregateFunction,正在返回:

ACC add(IN value,ACC accumulator);

我该怎么处理?

我使用的Flink版本是1.3.2,此版本的文档没有AggregateFunction,但是还没有版本1.4.

enter image description here

解决方法

您将找到AggregateFunction in the Flink 1.4 docs的一些文档,包括一个示例.

1.3.2中包含的版本仅限于与可变累加器类型一起使用,其中add操作修改累加器.这已经是fixed for Flink 1.4,但还没有发布.

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...