在 Flink 中停用 KeyedProcessFunction 上的 onTimer

问题描述

我们以此为例:

基于此概念,我在 onTimer 中被称为 KeyedProcessFunction 函数

(when a == "start" -> ctx.timerService().registerProcessingTimeTimer(some time in long)),

但是随着这个概念出现了一个新的记录,这意味着骑行结束:

(when a == "end" -> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();))

这第二个定时器立即触发动作,思路是清理之前设置的定时器,因为我感觉不到前一个定时器还活着。

重点是,如果动作二没有在一小时内发生,例如我需要做某事(ctx.timerService().registerProcessingTimeTimer(some time in long)),但如果预期值在那个小时内到达,则不需要触发计时器或触发器计时器立即忘记 (ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();)) 之前编程的另一个计时器,但我遇到了问题,因为无论发生什么情况都会触发计时器,前一个计时器也被触发。

我会尝试使用 ctx.timerService().deleteProcessingTimeTimer(some time in long); 但它似乎不起作用。

看例子:

事件顺序:事件A总是比B先到。 说明:事件 B 必须在事件 A 到达后一小时范围内到达,否则定时器会在 A 到达后一小时触发,但如果 B 在定时器设置一小时后到达,则应立即触发定时器并触发之前定义的计时器绝不能被调用删除)。

 SingleOutputStreamOperator<Events> abandonment = stream.keyBy(e -> e.id)
.process(new KeyedProcessFunctionName());


public class KeyedProcessFunctionNameextends KeyedProcessFunction<String,Event,Event> {

@Override
    public void processElement(Event e,Context ctx,Collector<Event> out) throws Exception {
        if (e.value.equalsIgnoreCase("B")) {
{
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime());
        }

        if (stateTwo.value() == null && e.value.equalsIgnoreCase("A")) {
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + SOME_FIXED_TIME_IN_LONG);
        }
}

}


@Override
    public void onTimer(long timestamp,OnTimerContext ctx,Collector<Event> out) throws Exception {

/*when this timer is been called because of B it must not been called because of a prevIoUs timer set because of A*/
}
}

有什么想法吗?

解决方法

定时器总是(隐式地)绑定到一个键上。创建计时器时,它与正在处理的事件的键(或当前触发的计时器的键)相关联。同样,您只能删除与当前上下文中的键关联的计时器。如果定时器删除似乎不起作用,也许这就是原因。

要记住的另一个事实是计时器已进行重复数据删除。换句话说,对于任何给定的(键,时间戳)对,最多可以有一个事件时间计时器和一个处理时间计时器。后续尝试为相同的密钥和时间戳注册另一个计时器将被忽略。

有时使用键控状态来记住当计时器(对于同一个键)触发时应该做什么是有帮助的。如果每个键有多个计时器,您可以使用由时间戳索引的 MapState 为每个计时器保留一些状态。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...