如何从键入之前应用的不同过滤器中恢复KeyedStream

问题描述

我如何分散相同的keyedStream并根据不同的用例应用过滤器,而无需在过滤结束时创建新的keyedStream? 示例:

DataStream<Event> streamFiltered = RabbitMQConnector.eventStreamObject(env)
                .flatMap(new Consumer())
                .name("Event Mapper")
                .assignTimestampsAndWatermarks(new PeriodicExtractor())
                .name("Watermarks Added")
                .filter(new NullIdEventsFilterFunction())
                .name("Event Filter");

/*Now I will or need to send the same keyedStream for applying two different transformations with different filters but under the same keyed concept*/

/*Once I'd applied the filter I will receive back a SingleOutputStreamOperator and then I need to keyBy again*/
/*in a normal scenario I will need to do keyBy again,and I want to avoid that */

KeyedStream<T,T> keyed1 = streamFiltered.filter(x -> x.id != null).keyBy(key -> key.id); /*wants to avoid this*/
KeyedStream<T,T> keyed2= streamFiltered.filter(x -> x.id.lenght > 10).keyBy(key -> key.id);/*wants to avoid this*/

seeProduct(keyed1);
checkProduct(keyed2);

/*these are just an example,this two operations receive a keyedStream under the same concept but with different filters applied to the keyedStream already created and wants to reuse that same keyedStream after different filters to avoid a new creation*/
private static SingleOutputStreamOperator<EventProduct>seeProduct(KeyedStream<Event,String> stream) {
        return stream.map(x -> new EventProduct(x)).name("Event Product");
    }

private static SingleOutputStreamOperator<EventCheck>checkProduct(KeyedStream<Event,String> stream) {
        return stream.map(x -> new EventCheck(x)).name("Event Check");
    }

在正常情况下,每个单个过滤器函数都将返回SingleOutputStream,然后我需要再次执行keyBy(但是我已经有一个id为keyedStream的主意,要在过滤器之后获取它,我将需要通过再次创建一个新的KeyedStream)。例如,应用过滤器后,如何保留keyedStream概念?

解决方法

我认为,在您的情况下,side output功能会有所帮助-您可以针对每种过滤情况从基本keyed stream处获得单独的输出。

请在flink侧输出文档:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html中查看更多详细信息和示例。

类似的事情(用伪代码)应该对您有用:

final OutputTag<Tuple2<String,Event>> outputTag1 = new OutputTag<>("side-output-filter-1"){};
final OutputTag<Tuple2<String,Event>> outputTag2 = new OutputTag<>("side-output-filter-2"){};
DataStream<Event> keyedStream = source.keyby(x -> x.id);
     .process(new KeyedProcessFunction<Tuple,Tuple2<String,Event>,Event>> {
     @Override
      public void processElement(
          Tuple2<String,Event> value,Context ctx,Collector<Tuple2<String,Event>> out) throws Exception {
        // emit data to regular output
        out.collect(value);

        // emit data to side output
        ctx.output(outputTag1,value);
        ctx.output(outputTag2,value);
      }
})
/*for use case one I need to use the same keyed concept but apply a filter*/
DataStream<Tuple2<String,Event>> sideOutputStream1 = keyedStream.getSideOutput(outputTag1).filter(x -> x.id != null);

/*for use case two I need to use the same keyed concept but apply a filter*/
DataStream<Tuple2<String,Event>> sideOutputStream2 = keyedStream.getSideOutput(outputTag2).filter(x -> x.id.lenght > 10);
,

最简单的答案似乎是先应用过滤,然后再使用keyBy。

如果出于某种原因需要在过滤之前对流进行键分区(例如,您可能正在应用使用键分区状态的RichFilterFunction),则可以使用reinterpretAsKeyedStream重新建立键,而无需另一个keyBy的费用。

使用侧面输出是将流分成几个过滤的子流的好方法,但是这些输出流将不再是KeyedStreams。如果重新应用键选择器功能会产生与已经存在的分区完全相同的分区,则只能安全地使用reinterpretAsKeyedStream。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...