如果在给定的时间间隔内未收到下一个事件,则超时CEP模式

问题描述

我是Flink的新手,我正在尝试一个POC,如果在大于CEP时段内指定的时间的x时间内未收到任何事件的情况下,

public class MyCEPApplication {


    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","localhost:9092");
        properties.setProperty("group.id","consumer_group");

        FlinkKafkaConsumer<String> inputSignal = new FlinkKafkaConsumer<>("consumer_topic",new SimpleStringSchema(),properties);

        DataStream<String> inputSignalKafka = streamExecutionEnvironment.addSource(inputSignal);


        DataStream<MyEvent> eventDataStream = inputSignalKafka.map(new MapFunction<String,MyEvent>() {
            @Override
            public MyEvent map(String value) throws Exception {
                ItmAtomicEvent MyEvent = new ItmAtomicEvent();
                MyEvent.setJsonObject(new JSONObject(value));
                MyEvent.setAttribute("time",System.currentTimeMillis());
                return MyEvent;
            }
        }).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<MyEvent>() {
            @Override
            public long extractTimestamp(MyEvent event,long currentTimestamp) {
                System.out.println("TIMESTAMP: " +(long)event.getAttribute("time").get());
                System.out.println("Time Difference: " +((long)(event.getAttribute("time").get()) - timeDifference));
                timeDifference = (long)(event.getAttribute("time").get());
                return (long)event.getAttribute("time").get();
            }
            @Override
            public Watermark checkAndGetNextWatermark(MyEvent lastElement,long extractedTimestamp) {
                return new Watermark(extractedTimestamp);
            }
        });


        eventDataStream.print("Source=======================================>");
        
        Pattern<MyEvent,?> pattern =
                Pattern.<MyEvent>begin("start")
                    .where(new SimpleCondition<MyEvent>() {
                        @Override
                        public boolean filter(MyEvent event) {
                            return event.equals("Event1");
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<MyEvent>() {
                        @Override
                        public boolean filter(MyEvent event) {
                            return event.equals("Event2");
                        }
                    }).within(Time.seconds(10));


            PatternStream<MyEvent> patternStream = cepPatternMatching.compile(pattern,eventDataStream);

            OutputTag<MyEvent> timedout = new OutputTag<MyEvent>("timedout") {};
            


            SingleOutputStreamOperator<MyEvent> result = patternStream.flatSelect(
                    timedout,new PatternFlatTimeoutFunction<MyEvent,MyEvent>() {
                        @Override
                        public void timeout(Map<String,List<MyEvent>> pattern,long timeoutTimestamp,Collector<MyEvent> out) throws Exception {
                            if(null != pattern.get("CustomerId")){
                                for (MyEvent timedOutEvent :pattern.get("CustomerId")){
                                    System.out.println("TimedOut Event : "+timedOutEvent.getField(0));
                                    out.collect(timedOutEvent);
                                }
                            }
                        }
                    },new PatternFlatSelectFunction<MyEvent,MyEvent>() {
                        @Override
                        public void flatSelect(Map<String,Collector<MyEvent> out) throws Exception {

                            out.collect(pattern.get("CustomerId").get(0));
                        }
                    }
            );


            result.print("=============CEP Pattern Detected==================");


            DataStream<MyEvent> timedOut = result.getSideOutput(timedout);
           
            timedOut.print("=============CEP TimedOut Pattern Detected==================");


        streamExecutionEnvironment.execute("CEPtest");

    }
}

即使在其打印超时事件10秒后未收到任何事件,我什至尝试注释掉代码PatternFlatSelectFunction方法,如果在给定的x秒内未收到任何事件,是否有方法解决方法会使模式超时。有人问我以下解决方案时提到的相同问题,但对我没有任何帮助,请帮助我解决问题

1)Apache Flink CEP how to detect if event did not occur within x seconds?

2)https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/cep.html#handling-timed-out-partial-patterns

3)https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/cep/LongRidesCEPSolution.java

解决方法

您的应用程序正在使用事件时间,因此尽管缺少传入事件,您仍需要安排生成足够大的水印。如果您要在来源闲置时人为地提高当前水印,可以使用this example

鉴于您的事件没有事件时间时间戳,为什么不简单地使用处理时间来避免此问题呢? (不过请注意,https://stackoverflow.com/a/50357721/2000823中提到的限制)。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...