如何为每个传入元素实现带有动态超时的flink countTriggerWithTimeout

问题描述

flink流处理非常新。这是我的要求: 在最近20秒内收到2个或更多元素时提醒用户。如果在20秒内收到少于2个元素,则不会发出警报,只需重新启动计数和时间即可。 每个元素的计数和间隔都不同。

这是我的代码

dataStream
.keyBy("id")
.window(EventTimeSessionWindows.withDynamicGap((event) -> event.getThresholdInterval()))
.trigger(new CountTriggerWithTimeout<TimeWindow>())

TriggerCode:
public class CountTriggerWithTimeout<W extends TimeWindow> extends Trigger<SystemEvent,W> {

  private ReducingStateDescriptor<Long> countState =
      new ReducingStateDescriptor<Long>("count",new Sum(),LongSerializer.INSTANCE);
  private ReducingStateDescriptor<Long> processedState =
      new ReducingStateDescriptor<Long>("processed",LongSerializer.INSTANCE);

  @Override
  public TriggerResult onElement(SystemEvent element,long timestamp,W window,TriggerContext ctx)
      throws Exception {
    ReducingState<Long> count = ctx.getPartitionedState(countState);
    ReducingState<Long> processed = ctx.getPartitionedState(processedState);
    count.add(1L);
    processed.add(0L);
    if (count.get() >= element.getThresholdCount() && processed.get() == 0) {
      processed.add(1L);
      return TriggerResult.FIRE_AND_PURGE;
    }
    if (timestamp >= window.getEnd()) { 
      return TriggerResult.PURGE;
    }
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time,TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time,TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(W window,TriggerContext ctx) throws Exception {
    ctx.getPartitionedState(countState).clear();
    ctx.getPartitionedState(processedState).clear();
  }
  
  @Override
  public boolean canMerge() {
    return true;
  }

  class Sum implements ReduceFunction<java.lang.Long> {
    @Override
    public Long reduce(Long value1,Long value2) throws Exception {
      return value1 + value2;
    }
  }
}

我使用时较早

dataStream
.timeWindow(Time.seconds(1))
.trigger(new CountTriggerWithTimeout<TimeWindow>())

一切正常。由于需要从元素读取窗口时间,因此我开始使用EventTimeSessionWindow并在触发器中添加了canMerge()函数。从那以后,没有任何工作。 clear()永远不会被调用,onProcessingTime()和onEventTime()也不会被调用。我看到时间戳始终设置为相同的值,而与何时接收元素无关。

我的要求是在event.getThresholdInterval()中的计数> =阈值时“触发并清除”。如果count

请帮助我解决此问题。

谢谢...

解决方法

为什么不使用20秒的简单 Tumbling Windows 并计算其中的元素:

source
        .keyBy("id")
        .timeWindow(Time.seconds(20))
        .process(new ProcessWindowFunction<Tuple2<String,Integer>,String,Tuple,TimeWindow>() {
            @Override
            public void process(Tuple key,ProcessWindowFunction<Tuple2<String,TimeWindow>.Context ctx,Iterable<Tuple2<String,Integer>> in,Collector<String> out) throws Exception {

                if (Lists.newArrayList(in).size() >= 2) {
                    out.collect("Two or more elements between "
                            + Instant.ofEpochMilli(ctx.window().getStart())
                            + " " + Instant.ofEpochMilli(ctx.window().getEnd()));
                }
            }
        })