Flink Co组外部联接因高背压而失败

问题描述

我在Flink中有两个流stream1每秒有70000条记录,而stream2可能有或没有数据。

// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
    environment
        .addSource(createHFAConsumer())
        .name("hfa source");

SingleOutputStreamOperator<EVWindow> stream2 = environment
        .addSource(createHFDConsumer())
        .name("hfd source");
    
DataStream<Message> pStream =
        stream1
        .coGroup(stream2)
        .where(obj -> obj.getid())
        .equalTo(ev -> ev.getid())
            .window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
            .evictor(Timeevictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
        .apply(new CalculateCoGroupFunction());

当两个Streams都具有数据时,这工作得很好,但是当stream2没有数据时,作业将以很高的背压失败。 cpu利用率也飙升了200%。

在这种情况下如何处理外部联接

解决方法

我认为问题在于空闲流缺少水印会阻碍整个水印。每当连接多个流时,生成的水印就是传入水印的最小值。然后,这可能会导致您遇到的问题。

您有两种选择:

  1. stream2的水印设置为Watermark.MAX_WATERMARK,从而stream1可以完全控制水印。
  2. 以某种方式检测到stream2处于空闲状态,尽管缺少事件,但人为地使水印前进。这是an example
,

感谢David Anderson的指针

RCA

当我尝试在自己的流周围创建翻转窗口时,出现了主要问题。

根据Flink Documentation

简而言之,应在该窗口的第一个元素到达时立即创建一个窗口,并且当时间(事件或处理时间)超过其结束时间戳加用户指定的允许时间时,该窗口将被完全删除。迟到

由于没有stream2的传入数据,因此窗口从未实现。正如大卫指出的

无论何时连接多个流,生成的水印都是传入水印的最小值

这意味着flink在等待stream1时正在缓冲来自stream2的数据,并最终导致高背压和最终OOM。

解决方案

我创建了一个外部脚本,以所需的时间间隔将虚拟心跳消息发送到Kafka流stream2,并在我的应用程序中添加了忽略这些消息进行计算的逻辑。

这迫使stream2stream1推进水印,并且窗口被移出上下文。

,

如前所述:

无论何时连接多个流,生成的水印都是 最低的传入水印

这意味着flink在等待时正在缓冲来自stream1的数据 stream2并最终导致高背压,最后导致 OOM。

它适用于coGroup()类中的DataStream<T>方法,该方法返回CoGroupedStreams<T,T2>

为避免这种行为,我们可以使用union(DataStream<T>... streams)方法,该方法返回一个简单的DataStream<T>,其中水印将像通常的流一样前进。

我们需要解决的唯一问题是为两个流都具有一个通用的架构(类)。我们可以将聚合类用于两个字段:

public class Aggregator {

  private FlatHighFrequencyAnalog flatHighFrequencyAnalog;
  private EVWindow evWindow;

  public Aggregator(FlatHighFrequencyAnalog flatHighFrequencyAnalog) {
    this.flatHighFrequencyAnalog = flatHighFrequencyAnalog;
  }

  public Aggregator(EVWindow evWindow) {
    this.evWindow = evWindow;
  }

  public FlatHighFrequencyAnalog getFlatHighFrequencyAnalog() {
    return flatHighFrequencyAnalog;
  }

  public EVWindow getEVWindow() {
    return evWindow;
  }
}

此外,更通用的方法是使用Either<L,R>中的org.apache.flink.types类。

最后总结一下我们将要得到的:

SingleOutputStreamOperator<Either<EVWindow,FlatHighFrequencyAnalog>> stream1 =
    environment
        .addSource(createHFAConsumer())
        .map(hfa -> Either.Left(hfa));

SingleOutputStreamOperator<Either<EVWindow,FlatHighFrequencyAnalog>> stream2 = 
    environment
        .addSource(createHFDConsumer())
        .map(hfd -> Either.Right(hfd));
    
DataStream<Message> pStream =
        stream1
          .union(stream2)
          .assignTimestampsAndWatermarks(
              WatermarkStrategy
                  .<Either<EVWindow,FlatHighFrequencyAnalog>>forBoundedOutOfOrderness(
                    ofSeconds(MAX_OUT_OF_ORDERNESS))
                .withTimestampAssigner((input,timestamp) -> input.isLeft() ? input.left().getTimeStamp() : input.right().getTimeStamp()))
          .keyBy(value -> value.isLeft() ? value.left().getId() : value.right().getId())
          .window(TumblingEventTimeWindows.of(Time.minutes(MINUTES)))
          .process(new ProcessWindowFunction());

要在处理功能中获取不同的集合

List<EVWindow> evWindows =
        Streams.stream(elements)
            .filter(Either::isLeft)
            .map(Either::left)
            .collect(Collectors.toList());

List<FlatHighFrequencyAnalog> highFrequencyAnalogs =
        Streams.stream(elements)
            .filter(Either::isRight)
            .map(Either::right)
            .collect(Collectors.toList());