Watermark在Flink CEP中远远落后

问题描述

我正在使用Flink CEP来检测来自Kafka的事件的模式。为了简单起见,事件只有一种类型。我正在尝试检测连续事件流中字段值的变化。代码如下所示

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(TypeInformation.of(classOf[...]))
    
val pattern: Pattern[Event,_] = 
          Pattern.begin[Event]("start",AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] {
             override def filter(event: Event,ctx:...): Boolean = {
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             }
          })
          .next("end").times(1)
          .where(new IterativeCondition[Event] {
             override def filter(event: Event,ctx:...): Boolean = {
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            }
          })
          .within(Time.seconds(30))

Kafka主题有104个分区,事件在各个分区之间均匀分布。当我提交作业时,parallelism设置为104。

在Web UI中,有两项任务:第一项是Source->filter->map->timestamp/watermark;第二项是CepOperator->sink。第二个是keyBy。每个任务有104个并行度。

子任务的工作负载不均衡,应来自high。子任务之间的水印有所不同,但是它们开始固定在一个值上,很长时间没有变化。从日志中,我可以看到CEP不断评估事件,并将匹配的结果推送到下游接收器。

事件速率为10k / s,第一个任务的反压保持ok,第二个任务的反压保持import 'package:intl/intl.dart'; class Sample extends StatelessWidget { @override Widget build(BuildContext context) { return Scaffold( body: Container( alignment: Alignment.center,child: Text(DateFormat('MMM dd,yyyy h:mm:ss a') .format(DateTime.parse(DateTime.now().toString())))),); } }

请帮助解释CEP中发生了什么以及如何解决该问题

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)