问题描述
我有一个Flink KeyedcoprocessFunction
,它在较大的Flink流作业中注册了处理时间计时器,并且我正在尝试使用Flink MiniCluster为整个作业创建单元测试。但是我无法在onTimer()
中回拨KeyedcoprocessFunction
来触发。
有人有这个工作吗?是否需要任何特殊配置?
切换到事件时间效果很好,所以我想知道这是否不适用于Flink MiniCluster或我的实现是否有问题。
我在Scala中编写了一个简单的测试,以查看是否可以使它正常工作。
import org.apache.flink.api.common.typeinfo.Type@R_328_4045@ion
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction,SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.streaming.runtime.util.TestListResultSink
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.flink.util.Collector
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.LoggerFactory
class TimerTest extends AnyFlatSpec with BeforeAndAfter {
private val SlotsPerTaskMgr = 1
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(SlotsPerTaskMgr)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"MiniCluster" should "trigger onTimer" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
implicit val longTypeInfo: Type@R_328_4045@ion[Long] = Type@R_328_404[email protected](classOf[Long])
val sink = new TestListResultSink[Long]
env.addSource(new MyLongSource(100))
.keyBy(v => v)
.process(new MyProccesor())
.addSink(sink)
env.execute()
println("Received " + sink.getResult.size() + " output records.")
}
}
class MyProccesor extends KeyedProcessFunction[Long,Long,Long] {
private val log = LoggerFactory.getLogger(this.getClass)
override def processElement(
value: Long,ctx: KeyedProcessFunction[Long,Long]#Context,out: Collector[Long]): Unit = {
log.info("Received {} at {}",value,ctx.timerService().currentProcessingTime())
if (value % 10 == 0) {
log.info("Scheduling processing timer for {}",ctx.timerService().currentProcessingTime() + 10)
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 10)
}
}
override def onTimer(
timestamp: Long,Long]#OnTimerContext,out: Collector[Long]): Unit = {
log.info("Received onTimer at {}",timestamp)
out.collect(timestamp)
}
}
class MyLongSource(n:Int) extends ParallelSourceFunction[Long] {
@volatile private var stop = false
override def run(ctx: SourceFunction.sourceContext[Long]): Unit = {
for(i <- 1 to n) {
if(stop) return;
println("Sending " + i)
ctx.collect(i)
}
Thread.sleep(1000)
}
override def cancel(): Unit = {
stop = true
}
}
通过在源Thread.sleep(1000)
方法的末尾添加run()
,我终于能够获得一些一致的结果。好像源一旦退出,消息就会得到处理,然后即使有待处理的计时器也将关闭所有内容。
解决方法
当Flink作业关闭时,任何未决的处理时间计时器都将被忽略。他们从不开火。
对于它的价值,Flink开发人员邮件列表上正在进行一些讨论,涉及提供一个选项来触发所有未决的处理时间计时器。参见http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-134-DataStream-Semantics-for-Bounded-Input-td37365.html#a37558。