带时间戳的Flink计数器

问题描述

我正在阅读Flink示例CountWithTimestamp,下面是该示例的代码片段:

  @Override
    public void processElement(Tuple2<String,String> value,Context ctx,Collector<Tuple2<String,Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp,OnTimerContext ctx,Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String,Long>(result.key,result.count));
        }
    }
}

我的问题是,如果我在onTimer中删除if语句timestamp == result.lastModified + 60000(收集未触摸的stmt),而在processElement开始时将其替换为另一个if语句if(ctx.timestamp < current.lastModified + 60000) { deleteEventTimeTimer(current.lastModified + 60000)},则符号含义是否正确程序是一样的吗?在语义相同的情况下,一个版本相对于另一个版本有何偏好?

解决方法

您正确地认为删除计时器的实现具有相同的语义。实际上,我最近更改了我们的培训材料中使用的示例来做到这一点,因为我更喜欢这种方法。我认为更可取的原因是,所有复杂的业务逻辑都放在一个位置(在processElement中),每当调用onTimer时,您便确切知道要做什么,而不会提出任何问题。另外,它的性能更高,因为用于检查点并最终触发的计时器更少。

此示例是为文档编写的,可以删除计时器,但尚未更新。

一旦您经过注册页面,就可以找到我在这些幻灯片中提到的经过修改的示例https://training.ververica.com/decks/process-function/

FWIW,我最近还按照相同的思路将参考解决方案重新设计为相应的训练练习:https://github.com/apache/flink-training/tree/master/long-ride-alerts