Watermark在Flink CEP中远远落后

问题描述

我正在使用Flink CEP来检测来自Kafka的事件的模式。为了简单起见,事件只有一种类型。我正在尝试检测连续事件流中字段值的变化。代码如下所示

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(Typeinformation.of(classOf[...]))
    
val pattern: Pattern[Event,_] = 
          Pattern.begin[Event]("start",AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] {
             override def filter(event: Event,ctx:...): Boolean = {
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             }
          })
          .next("end").times(1)
          .where(new IterativeCondition[Event] {
             override def filter(event: Event,ctx:...): Boolean = {
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            }
          })
          .within(Time.seconds(30))

Kafka主题有104个分区,事件在各个分区之间均匀分布。当我提交作业时,parallelism设置为104。

在Web UI中,有两项任务:第一项是Source->filter->map->timestamp/watermark;第二项是CepOperator->sink。第二个是keyBy。每个任务有104个并行度。

子任务的工作负载不均衡,应来自high。子任务之间的水印有所不同,但是它们开始固定在一个值上,很长时间没有变化。从日志中,我可以看到CEP不断评估事件,并将匹配的结果推送到下游接收器。

事件速率为10k / s,第一个任务的反压保持ok,第二个任务的反压保持import 'package:intl/intl.dart'; class Sample extends StatelessWidget { @override Widget build(BuildContext context) { return Scaffold( body: Container( alignment: Alignment.center,child: Text(DateFormat('MMM dd,yyyy h:mm:ss a') .format(DateTime.parse(DateTime.Now().toString())))),); } }

请帮助解释CEP中发生了什么以及如何解决该问题

谢谢

解决方法

考虑到您的问题后,我正在修改我的答案。

听起来CEP正在继续产生火柴,并且将它们推到水槽,但是CEP +水槽的任务正在产生很高的背压。有助于确定背压的原因。

如果可以从所有分区读取事件,而水印只是勉强前进,则听起来背压足够严重,根本无法吸收事件。

我怀疑

  1. CEP引擎中的组合努力和/或
  2. 足以使水槽跟不上的比赛

可能的原因。

获取更多见解的一些想法:

(1)尝试使用探查器确定CepOperator是否是瓶颈,并可能确定它在做什么。

(2)禁用CepOperator和接收器之间的运算符链接以隔离CEP-仅作为调试步骤。这将使您(通过度量标准和反压监视)更好地了解CEP和接收器各自在做什么。

(3)在较小的设置中进行测试,然后扩展CEP日志记录。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...