使用MiniCluster测试flink作业以使用处理时间触发计时器

问题描述

在使用 MiniClusterWithClientResource 测试flink作业时,是否有任何方法可以控制触发计时器的处理时间?

我能够在单元测试中使用 testharness 并控制处理时间,即:

//通过直接提前操作员的处理时间来触发处理时间计时器 testHarness.setProcessingTime(300000)

因此。我可以在指定时间触发计时器

但是,我现在需要的是使用minicluster MiniClusterWithClientResource

在端到端flink作业测试中触发计时器

val flinkCluster =新的MiniClusterWithClientResource ... 并能够提前处理时间来触发onTimer方法

解决方法

在发送所有消息后,在SourceFunction类中添加一秒钟的Thread.sleep(1000)解决了该问题。

class MySourceFunction() extends RichParallelSourceFunction[]{
...

//is a one-time Delay after all messages have been sent
Thread.sleep(1000)
}