Flink 操作示例 —— 状态

flatMap(...RichFlatMapFunction)

val keyedSensorData = sensorData.keyBy(_.id)

val alerts  = keyedSensorData
  .flatMap(new TemperatureAlert(1.7))

alerts.print()

...

class TemperatureAlert(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
    private var lastTempState: ValueState[Double] = _

    override def open(parameters: Configuration): Unit = {
        val lastTempDesc = new ValueStateDescriptor[Double]("lastTemp", Types.of[Double])
        lastTempState = getRuntimeContext.getState[Double](lastTempDesc)
    }

    override def flatMap(reading: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
        val lastTemp = lastTempState.value()
        val tempDiff = (lastTemp - reading.temperature).abs
        if (tempDiff > threshold) {
            out.collect(reading.id, reading.temperature, tempDiff)
        }
        this.lastTempState.update(reading.temperature)
    }
}

 

flatMapWithState

val alerts = keyedSensorData.flatMapWithState[(String, Double, Double), Double] {
  case (in: SensorReading, None) =>
    (List.empty, Some(in.temperature))
  case (r: SensorReading, lastTemp: Some[Double]) =>
    val tempDiff = (r.temperature - lastTemp.get).abs
    if (tempDiff > 1.7) {
      (List((r.id, r.temperature, tempDiff)), Some(r.temperature))
    } else {
      (List.empty, Some(r.temperature))
    }
}

 

 

233

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...