问题描述
在使用 MiniClusterWithClientResource 测试flink作业时,是否有任何方法可以控制触发计时器的处理时间?
我能够在单元测试中使用 testharness 并控制处理时间,即:
//通过直接提前操作员的处理时间来触发处理时间计时器 testHarness.setProcessingTime(300000)
因此。我可以在指定时间触发计时器
但是,我现在需要的是使用minicluster MiniClusterWithClientResource
在端到端flink作业测试中触发计时器val flinkCluster =新的MiniClusterWithClientResource ... 并能够提前处理时间来触发onTimer方法
解决方法
在发送所有消息后,在SourceFunction类中添加一秒钟的Thread.sleep(1000)解决了该问题。
class MySourceFunction() extends RichParallelSourceFunction[]{
...
//is a one-time Delay after all messages have been sent
Thread.sleep(1000)
}