如何使用flink CEP匹配前10秒的数据?

问题描述

输入数据为:

Arrays.asList(
                Tuple3.of("1",100L,0),Tuple3.of("1",300L,500L,1),600L,750L,900L,1100L,1300L,2000L,4000L,6000L,8000L,10000L,10800L,10900L,11000L,11300L,0)
        )

我想匹配tuple3.f2等于1的前10秒数据,期望结果是(1,500,1)到(1,10000,1),但是程序结果是:

[(1,(1,600,750,900,1100,1)]

Flink CEP模式:(Flink版本:1.11.1)

Pattern<Tuple3<String,Long,Integer>,Tuple3<String,Integer>> headSlicePattern =
                Pattern.<Tuple3<String,Integer>>begin("up",AfterMatchSkipStrategy.skipPastLastEvent())
                        .where(new SimpleCondition<Tuple3<String,Integer>>() {
                            private static final long serialVersionUID = 3426292175809665737L;

                            @Override
                            public boolean filter(Tuple3<String,Integer> tuple3) throws Exception {
                                return tuple3.f2.equals(0);
                            }
                        })
                        .next("middle")
                        .where(new IterativeCondition<Tuple3<String,Integer>>() {
                            private static final long serialVersionUID = -3851397850893872739L;

                            @Override
                            public boolean filter(Tuple3<String,Integer> value,Context<Tuple3<String,Integer>> ctx) throws Exception {
                                return value.f2.equals(1);
                            }
                        }).timesOrMore(5).consecutive().within(Time.seconds(10));

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...