问题描述
我们以此为例:
基于此概念,我在 onTimer
中被称为 KeyedProcessFunction
函数:
(when a == "start" -> ctx.timerService().registerProcessingTimeTimer(some time in long))
,
但是随着这个概念出现了一个新的记录,这意味着骑行结束:
(when a == "end" -> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();))
这第二个定时器立即触发动作,思路是清理之前设置的定时器,因为我感觉不到前一个定时器还活着。
重点是,如果动作二没有在一小时内发生,例如我需要做某事(ctx.timerService().registerProcessingTimeTimer(some time in long))
,但如果预期值在那个小时内到达,则不需要触发计时器或触发器计时器立即忘记 (ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();))
之前编程的另一个计时器,但我遇到了问题,因为无论发生什么情况都会触发计时器,前一个计时器也被触发。
我会尝试使用 ctx.timerService().deleteProcessingTimeTimer(some time in long);
但它似乎不起作用。
看例子:
事件顺序:事件A总是比B先到。 说明:事件 B 必须在事件 A 到达后一小时范围内到达,否则定时器会在 A 到达后一小时触发,但如果 B 在定时器设置一小时后到达,则应立即触发定时器并触发之前定义的计时器绝不能被调用(删除)。
SingleOutputStreamOperator<Events> abandonment = stream.keyBy(e -> e.id)
.process(new KeyedProcessFunctionName());
public class KeyedProcessFunctionNameextends KeyedProcessFunction<String,Event,Event> {
@Override
public void processElement(Event e,Context ctx,Collector<Event> out) throws Exception {
if (e.value.equalsIgnoreCase("B")) {
{
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime());
}
if (stateTwo.value() == null && e.value.equalsIgnoreCase("A")) {
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + SOME_FIXED_TIME_IN_LONG);
}
}
}
@Override
public void onTimer(long timestamp,OnTimerContext ctx,Collector<Event> out) throws Exception {
/*when this timer is been called because of B it must not been called because of a prevIoUs timer set because of A*/
}
}
有什么想法吗?
解决方法
定时器总是(隐式地)绑定到一个键上。创建计时器时,它与正在处理的事件的键(或当前触发的计时器的键)相关联。同样,您只能删除与当前上下文中的键关联的计时器。如果定时器删除似乎不起作用,也许这就是原因。
要记住的另一个事实是计时器已进行重复数据删除。换句话说,对于任何给定的(键,时间戳)对,最多可以有一个事件时间计时器和一个处理时间计时器。后续尝试为相同的密钥和时间戳注册另一个计时器将被忽略。
有时使用键控状态来记住当计时器(对于同一个键)触发时应该做什么是有帮助的。如果每个键有多个计时器,您可以使用由时间戳索引的 MapState
为每个计时器保留一些状态。