问题描述
我正在使用Flink CEP来检测来自Kafka的事件的模式。为了简单起见,事件只有一种类型。我正在尝试检测连续事件流中字段值的变化。代码如下所示
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
.filter(...)
.map(...)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
)
.keyBy(...)(Typeinformation.of(classOf[...]))
val pattern: Pattern[Event,_] =
Pattern.begin[Event]("start",AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
.next("middle")
.oneOrMore()
.optional()
.where(new IterativeCondition[Event] {
override def filter(event: Event,ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.next("end").times(1)
.where(new IterativeCondition[Event] {
override def filter(event: Event,ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
!startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.within(Time.seconds(30))
Kafka主题有104个分区,事件在各个分区之间均匀分布。当我提交作业时,parallelism
设置为104。
在Web UI中,有两项任务:第一项是Source->filter->map->timestamp/watermark
;第二项是CepOperator->sink
。第二个是keyBy
。每个任务有104个并行度。
子任务的工作负载不均衡,应来自high
。子任务之间的水印有所不同,但是它们开始固定在一个值上,很长时间没有变化。从日志中,我可以看到CEP不断评估事件,并将匹配的结果推送到下游接收器。
事件速率为10k / s,第一个任务的反压保持ok
,第二个任务的反压保持import 'package:intl/intl.dart';
class Sample extends StatelessWidget {
@override
Widget build(BuildContext context) {
return Scaffold(
body: Container(
alignment: Alignment.center,child: Text(DateFormat('MMM dd,yyyy h:mm:ss a')
.format(DateTime.parse(DateTime.Now().toString())))),);
}
}
。
请帮助解释CEP中发生了什么以及如何解决该问题
谢谢
解决方法
考虑到您的问题后,我正在修改我的答案。
听起来CEP正在继续产生火柴,并且将它们推到水槽,但是CEP +水槽的任务正在产生很高的背压。有助于确定背压的原因。
如果可以从所有分区读取事件,而水印只是勉强前进,则听起来背压足够严重,根本无法吸收事件。
我怀疑
- CEP引擎中的组合努力和/或
- 足以使水槽跟不上的比赛
可能的原因。
获取更多见解的一些想法:
(1)尝试使用探查器确定CepOperator是否是瓶颈,并可能确定它在做什么。
(2)禁用CepOperator和接收器之间的运算符链接以隔离CEP-仅作为调试步骤。这将使您(通过度量标准和反压监视)更好地了解CEP和接收器各自在做什么。
(3)在较小的设置中进行测试,然后扩展CEP日志记录。