如何使用Flink MiniCluster触发ProcessTimeTimer

问题描述

我有一个Flink KeyedcoprocessFunction,它在较大的Flink流作业中注册了处理时间计时器,并且我正在尝试使用Flink MiniCluster为整个作业创建单元测试。但是我无法在onTimer()中回拨KeyedcoprocessFunction来触发。

有人有这个工作吗?是否需要任何特殊配置?

切换到事件时间效果很好,所以我想知道这是否不适用于Flink MiniCluster或我的实现是否有问题。

我在Scala中编写了一个简单的测试,以查看是否可以使它正常工作。

import org.apache.flink.api.common.typeinfo.Type@R_328_4045@ion
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction,SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.streaming.runtime.util.TestListResultSink
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.flink.util.Collector
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.LoggerFactory

class TimerTest extends AnyFlatSpec with BeforeAndAfter {

  private val SlotsPerTaskMgr = 1
  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
    .setNumberSlotsPerTaskManager(SlotsPerTaskMgr)
    .setNumberTaskManagers(1)
    .build)

  before {
    flinkCluster.before()
  }

  after {
    flinkCluster.after()
  }

  "MiniCluster" should "trigger onTimer" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    implicit val longTypeInfo: Type@R_328_4045@ion[Long] = Type@R_328_404[email protected](classOf[Long])

    val sink = new TestListResultSink[Long]

    env.addSource(new MyLongSource(100))
      .keyBy(v => v)
      .process(new MyProccesor())
      .addSink(sink)

    env.execute()

    println("Received " + sink.getResult.size() + " output records.")
  }

}

class MyProccesor extends KeyedProcessFunction[Long,Long,Long] {

  private val log = LoggerFactory.getLogger(this.getClass)

  override def processElement(
                               value: Long,ctx: KeyedProcessFunction[Long,Long]#Context,out: Collector[Long]): Unit = {
    log.info("Received {} at {}",value,ctx.timerService().currentProcessingTime())
    if (value % 10 == 0) {
      log.info("Scheduling processing timer for {}",ctx.timerService().currentProcessingTime() + 10)
      ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 10)
    }
  }

  override def onTimer(
                        timestamp: Long,Long]#OnTimerContext,out: Collector[Long]): Unit = {
    log.info("Received onTimer at {}",timestamp)
    out.collect(timestamp)
  }
}

class MyLongSource(n:Int) extends ParallelSourceFunction[Long] {
  @volatile private var stop = false

  override def run(ctx: SourceFunction.sourceContext[Long]): Unit = {
    for(i <- 1 to n) {
      if(stop) return;
      println("Sending " + i)
      ctx.collect(i)
    }

    Thread.sleep(1000)
  }

  override def cancel(): Unit = {
    stop = true
  }
}

通过在源Thread.sleep(1000)方法的末尾添加run(),我终于能够获得一些一致的结果。好像源一旦退出,消息就会得到处理,然后即使有待处理的计时器也将关闭所有内容

解决方法

当Flink作业关闭时,任何未决的处理时间计时器都将被忽略。他们从不开火。

对于它的价值,Flink开发人员邮件列表上正在进行一些讨论,涉及提供一个选项来触发所有未决的处理时间计时器。参见http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-134-DataStream-Semantics-for-Bounded-Input-td37365.html#a37558