问题描述
我在Flink中有两个流stream1
每秒有70000条记录,而stream2
可能有或没有数据。
// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
environment
.addSource(createHFAConsumer())
.name("hfa source");
SingleOutputStreamOperator<EVWindow> stream2 = environment
.addSource(createHFDConsumer())
.name("hfd source");
DataStream<Message> pStream =
stream1
.coGroup(stream2)
.where(obj -> obj.getid())
.equalTo(ev -> ev.getid())
.window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.evictor(Timeevictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.apply(new CalculateCoGroupFunction());
当两个Streams都具有数据时,这工作得很好,但是当stream2没有数据时,作业将以很高的背压失败。 cpu利用率也飙升了200%。
在这种情况下如何处理外部联接
解决方法
我认为问题在于空闲流缺少水印会阻碍整个水印。每当连接多个流时,生成的水印就是传入水印的最小值。然后,这可能会导致您遇到的问题。
您有两种选择:
- 将
stream2
的水印设置为Watermark.MAX_WATERMARK
,从而stream1
可以完全控制水印。 - 以某种方式检测到
stream2
处于空闲状态,尽管缺少事件,但人为地使水印前进。这是an example。
感谢David Anderson的指针
RCA :
当我尝试在自己的流周围创建翻转窗口时,出现了主要问题。
简而言之,应在该窗口的第一个元素到达时立即创建一个窗口,并且当时间(事件或处理时间)超过其结束时间戳加用户指定的允许时间时,该窗口将被完全删除。迟到
由于没有stream2
的传入数据,因此窗口从未实现。正如大卫指出的
无论何时连接多个流,生成的水印都是传入水印的最小值
这意味着flink在等待stream1
时正在缓冲来自stream2
的数据,并最终导致高背压和最终OOM。
解决方案:
我创建了一个外部脚本,以所需的时间间隔将虚拟心跳消息发送到Kafka流stream2
,并在我的应用程序中添加了忽略这些消息进行计算的逻辑。
这迫使stream2
和stream1
推进水印,并且窗口被移出上下文。
如前所述:
无论何时连接多个流,生成的水印都是 最低的传入水印
和
这意味着flink在等待时正在缓冲来自stream1的数据 stream2并最终导致高背压,最后导致 OOM。
它适用于coGroup()
类中的DataStream<T>
方法,该方法返回CoGroupedStreams<T,T2>
。
为避免这种行为,我们可以使用union(DataStream<T>... streams)
方法,该方法返回一个简单的DataStream<T>
,其中水印将像通常的流一样前进。
我们需要解决的唯一问题是为两个流都具有一个通用的架构(类)。我们可以将聚合类用于两个字段:
public class Aggregator {
private FlatHighFrequencyAnalog flatHighFrequencyAnalog;
private EVWindow evWindow;
public Aggregator(FlatHighFrequencyAnalog flatHighFrequencyAnalog) {
this.flatHighFrequencyAnalog = flatHighFrequencyAnalog;
}
public Aggregator(EVWindow evWindow) {
this.evWindow = evWindow;
}
public FlatHighFrequencyAnalog getFlatHighFrequencyAnalog() {
return flatHighFrequencyAnalog;
}
public EVWindow getEVWindow() {
return evWindow;
}
}
此外,更通用的方法是使用Either<L,R>
中的org.apache.flink.types
类。
最后总结一下我们将要得到的:
SingleOutputStreamOperator<Either<EVWindow,FlatHighFrequencyAnalog>> stream1 =
environment
.addSource(createHFAConsumer())
.map(hfa -> Either.Left(hfa));
SingleOutputStreamOperator<Either<EVWindow,FlatHighFrequencyAnalog>> stream2 =
environment
.addSource(createHFDConsumer())
.map(hfd -> Either.Right(hfd));
DataStream<Message> pStream =
stream1
.union(stream2)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Either<EVWindow,FlatHighFrequencyAnalog>>forBoundedOutOfOrderness(
ofSeconds(MAX_OUT_OF_ORDERNESS))
.withTimestampAssigner((input,timestamp) -> input.isLeft() ? input.left().getTimeStamp() : input.right().getTimeStamp()))
.keyBy(value -> value.isLeft() ? value.left().getId() : value.right().getId())
.window(TumblingEventTimeWindows.of(Time.minutes(MINUTES)))
.process(new ProcessWindowFunction());
要在处理功能中获取不同的集合
List<EVWindow> evWindows =
Streams.stream(elements)
.filter(Either::isLeft)
.map(Either::left)
.collect(Collectors.toList());
List<FlatHighFrequencyAnalog> highFrequencyAnalogs =
Streams.stream(elements)
.filter(Either::isRight)
.map(Either::right)
.collect(Collectors.toList());