问题描述
输入数据为:
Arrays.asList(
Tuple3.of("1",100L,0),Tuple3.of("1",300L,500L,1),600L,750L,900L,1100L,1300L,2000L,4000L,6000L,8000L,10000L,10800L,10900L,11000L,11300L,0)
)
我想匹配tuple3.f2等于1的前10秒数据,期望结果是(1,500,1)到(1,10000,1),但是程序结果是:
[(1,(1,600,750,900,1100,1)]
Flink CEP模式:(Flink版本:1.11.1)
Pattern<Tuple3<String,Long,Integer>,Tuple3<String,Integer>> headSlicePattern =
Pattern.<Tuple3<String,Integer>>begin("up",AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition<Tuple3<String,Integer>>() {
private static final long serialVersionUID = 3426292175809665737L;
@Override
public boolean filter(Tuple3<String,Integer> tuple3) throws Exception {
return tuple3.f2.equals(0);
}
})
.next("middle")
.where(new IterativeCondition<Tuple3<String,Integer>>() {
private static final long serialVersionUID = -3851397850893872739L;
@Override
public boolean filter(Tuple3<String,Integer> value,Context<Tuple3<String,Integer>> ctx) throws Exception {
return value.f2.equals(1);
}
}).timesOrMore(5).consecutive().within(Time.seconds(10));
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)