Flink实例四十四: Operators五REDUCE

REDUCE

reduce算子是滚动聚合的泛化实现。它将一个ReduceFunction应用到了一个KeyedStream上面去。reduce算子将会把每一个输入事件和当前已经reduce出来的值做聚合计算。reduce操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。

对分组数据进行处理更为通用的方法是使用reduce算子。

reduce算子

上图展示了reduce算子的原理:reduce在按照同一个Key分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。

reduce函数可以通过实现接口ReduceFunction来创建一个类。ReduceFunction接口定义了reduce()方法,此方法接收两个输入事件,输入一个相同类型的事件。

// T: the element type
ReduceFunction[T]
    > reduce(T, T): T

下面的例子,流根据传感器ID分流,然后计算每个传感器的当前最大温度值。

scala version

val maxTempPerSensor = keyed.reduce((r1, r2) => r1.temperature.max(r2.temperature))

java version

复制代码

DataStream<SensorReading> maxTempPerSensor = keyed
        .reduce((r1, r2) -> {
            if (r1.temperature > r2.temperature) {
                return r1;
            } else {
                return r2;
            }
        });

复制代码

reduce作为滚动聚合的泛化实现,同样也要针对每一个key保存状态。因为状态从来不会清空,所以我们需要将reduce算子应用在一个有限key的流上。

实例二:

case class score(name: String, course: String, score: Int)

val dataStream: DataStream[score] = senv.fromElements(
  score("Li", "English", 90), score("Wang", "English", 88), score("Li", "Math", 85),
  score("Wang", "Math", 92), score("Liu", "Math", 91), score("Liu", "English", 87))

class MyReduceFunction() extends ReduceFunction[score] {
  // reduce 接受两个输入,生成一个同类型的新的输出
  override def reduce(s1: score, s2: score): score = {
    score(s1.name, "Sum", s1.score + s2.score)
  }
}

val sumReduceFunctionStream = dataStream
      .keyBy("name")
      .reduce(new MyReduceFunction)

使用Lambda表达式更简洁一些:

val sumLambdaStream = dataStream
      .keyBy("name")
      .reduce((s1, s2) => score(s1.name, "Sum", s1.score + s2.score))

 

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...