问题描述
我正在使用Flink CEP来检测来自Kafka的事件的模式。为了简单起见,事件只有一种类型。我正在尝试检测连续事件流中字段值的变化。代码如下所示
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
.filter(...)
.map(...)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
)
.keyBy(...)(TypeInformation.of(classOf[...]))
val pattern: Pattern[Event,_] =
Pattern.begin[Event]("start",AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
.next("middle")
.oneOrMore()
.optional()
.where(new IterativeCondition[Event] {
override def filter(event: Event,ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.next("end").times(1)
.where(new IterativeCondition[Event] {
override def filter(event: Event,ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
!startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.within(Time.seconds(30))
Kafka主题有104个分区,事件在各个分区之间均匀分布。当我提交作业时,parallelism
设置为104。
在Web UI中,有两项任务:第一项是Source->filter->map->timestamp/watermark
;第二项是CepOperator->sink
。第二个是keyBy
。每个任务有104个并行度。
子任务的工作负载不均衡,应来自high
。子任务之间的水印有所不同,但是它们开始固定在一个值上,很长时间没有变化。从日志中,我可以看到CEP不断评估事件,并将匹配的结果推送到下游接收器。
事件速率为10k / s,第一个任务的反压保持ok
,第二个任务的反压保持import 'package:intl/intl.dart';
class Sample extends StatelessWidget {
@override
Widget build(BuildContext context) {
return Scaffold(
body: Container(
alignment: Alignment.center,child: Text(DateFormat('MMM dd,yyyy h:mm:ss a')
.format(DateTime.parse(DateTime.now().toString())))),);
}
}
。
请帮助解释CEP中发生了什么以及如何解决该问题
谢谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)