为什么AssignerWithPunctuatedWatermarks在我的数据流中不起作用?

问题描述

我有问题。
我在程序中间使用了.assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks(60000))(在一些过滤器,地图和其他Apache Flink运算符之后)。
我有简单的模式:

begin("start",AfterMatchSkipStrategy.skipToLast("end"))
  .where(new SimpleConditionA())
  .followedBy("end")
  .where(new SimpleConditionB())
  .within(Times.minutes(5));

我从Apache kafka中读取了一条消息(以下是事件时间戳记)
消息A:A.timestamp = 11:50:00
消息B:B.timestamp = 11:51:00

public class MyAssignerWithPunctuatedWatermark implements AssignerWithPunctuatedWatermark{

  private long maxOutOfOrderness;
  private long currentMaxTimestamp;
  /*there are a Constructor*/

  public long extractTimestamp (Event e,long l){
    long timestamp = e.timestamp;
    if(timestamp > currentMaxTimestamp){
      currentMaxTimestamp = timestamp;
    }
    return timestamp;
  }

  public Watermark checkAndGetNextWatermark(Event e,long l){
    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  }
}

然后我在Apache Kafka中输入消息A-没关系!
然后,我在Apache Kafka中输入了一条消息B-模式未组装。 (我在SimpleCondtionB中使用日志日志,消息B在SimpleCondtion中不存在)。为什么????
当我使用.assignTimestampsAndWatermarks(new IngestionTimeExtractor())时。一切都还好。但是我需要参加一个迟到的活动(迟到不超过1分钟)。

解决方法

有必要在{em> 之前执行assignTimestampsAndWatermarks,这取决于时序。 CEP取决于assignTimestampsAndWatermarks提供的时序信息,以便对输入流进行排序并处理within(duration)约束。

为了正确地将模式与可能是乱序的事件时间流进行匹配,CEP必须首先根据传入事件的时间戳对传入事件进行排序。作为此过程的一部分,每个传入事件都将保留在缓冲区中,直到当前水印通过其时间戳为止-因为到那时为止,更早的事件可能仍会到达(而不被视为 late )。带有当前水印之前的时间戳的最新事件或者被CEP丢弃,或者被发送到侧面输出(如果已配置)。

因此,为了使您的模式能够处理消息B,您必须首先注入一个时间戳,该事件的时间戳晚于11:51:00(消息B的时间)才能满足maxOutOfOrderness您的要求已在您的水印生成器中使用过。

使用IngestionTimeExtractor时事件不会发生混乱,因此不需要此排序步骤,并且可以毫不延迟地处理消息B。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...