如何广播CEP模式并在CEP中遍历多个模式?

问题描述

我正在尝试在CEP中的Patternstream上应用模式列表,我在下面尝试了一些附加代码,我是Flink的新手,我不确定这是愚蠢的方法还是正确的方法,我不确定是否可以使用是否在KeyedbroadCastFunction中的processFunction内部创建数据流

   public static final MapStateDescriptor<String,String> patternDescriptor = new MapStateDescriptor<String,String>("CEPPatternList",BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO);
  
  StreamExecutionEnvironment env = env.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 DataStream<JSONObject> source =
                env.fromCollection(Arrays.asList(event,event2,event3));//KafkaSource


        DataStream<Tuple2<String,JSONObject>> eventDataStream =
                source.map(new JSONObjectToTuple2());

        DataStream<Tuple2<String,JSONObject>> eventStream = eventDataStream.keyBy(0);


        DataStream<Tuple2<String,String>> patternStream =
                env.fromElements(pattern).flatMap(new FlatMapFunction<String,Tuple2<String,String>>() {
                    @Override
                    public void flatMap(String value,Collector<Tuple2<String,String>> out) throws Exception {
                        out.collect(new Tuple2<>("PATTERN_1",value));
                    }
                });




        broadcastStream<Tuple2<String,String>> broadcastStream =
                patternStream.broadcast(patternDescriptor);


        DataStream<Tuple2<String,JSONObject>> output =
                eventStream.connect(broadcastStream).process(new KeyedbroadcastProcessFunction<String,JSONObject>,String>,JSONObject>>() {
                    @Override
                    public void processElement(Tuple2<String,JSONObject> value,ReadOnlyContext ctx,JSONObject>> out) throws Exception {

                        for (Map.Entry<String,String> patterns :
                                ctx.getbroadcastState(patternRuleDescriptor).immutableEntries()) {

                            String patternValue = patterns.getValue();

                            DataStream<Tuple2<String,JSONObject>> eventStream =
                                    env.fromElements(value);

 
                            PatternStream<Tuple2<String,JSONObject>> patternStream =
                                    cepPatternMatching.compile(patternValue,eventStream);

                            OutputTag<Tuple2<String,JSONObject>> timedout = new OutputTag<Tuple2<String,JSONObject>>(
                                    "timedout") {
                            };

                            SingleOutputStreamOperator<Tuple2<String,JSONObject>> result = patternStream.flatSelect(
                                    timedout,new EventTimeOut(),new PatternFlatSelect()
                            );


                            result.flatMap(new FlatMapFunction<Tuple2<String,Object>() {
                                @Override
                                public void flatMap(Tuple2<String,Collector<Object> out) throws Exception {
                                    out.collect(value);
                                }
                            });


                            ctx.output(unMatched,value);

                       }
                        }
                    }

                    @Override
                    public void processbroadcastElement(Tuple2<String,String> value,Context ctx,JSONObject>> out) throws Exception {
                        System.out.println("Pattern Name: " + value.f0);
                        System.out.println("Pattern Condition: " + value.f1);
                        ctx.getbroadcastState(patternDescriptor).put(value.f0,value.f1);
                    }
                });

我不确定这是否是正确的方法,但我遇到了错误

 Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the StreamExecutionEnvironment is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
    at org.apache.flink.streaming.api.datastream.broadcastConnectedStream.clean(broadcastConnectedStream.java:249)
    at org.apache.flink.streaming.api.datastream.broadcastConnectedStream.process(broadcastConnectedStream.java:162)
    at org.apache.flink.streaming.api.datastream.broadcastConnectedStream.process(broadcastConnectedStream.java:139)
    at CEPPatternMatchingApp.main(CEPPatternMatchingApp.java:122)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment

    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    ... 7 more

我没有在代码中的任何地方进行序列化,不确定为什么会出现此错误,我什至尝试实现可序列化的接口,但仍然出现错误

能帮我吗,有办法广播CEP模式并逐一迭代以应用于数据流

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...