Apache Flink 在特定键上加入不同的数据流

问题描述

我有两个DataStreams,第一个叫做DataStream<String> source,它从消息代理接收记录,第二个是SingleOutputoperator<Event> events,它是将源映射到Event.class

我有一个用例需要使用 SingleOutputoperator<Event> events,其他用例需要使用 DataStream<String> source。在使用 DataStream<String> source 的用例之一中,我需要在应用一些过滤器后加入 SingleOutputoperator<String> result 并避免将 source 再次映射到 Event.class,因为我已经有了该操作已完成且 Stream,我需要将每条记录搜索SingleOutputoperator<String> result 中的 SingleOutputoperator<Event> events 并应用另一个映射来导出 SingleOutputoperator<EventOutDto> out

这是一个例子:

DataStream<String> source = env.readFrom(source);
SingleOutputoperator<Event> events = source.map(s -> mapper.readValue(s,Event.class));


public void filterandJoin(DataStream<String> source,SingleOutputoperator<Event> events){
  
  SingleOutputoperator<String> filtered = source.filter(s -> new FilterFunction());
  
  SingleOutputoperator<EventOutDto> result = (this will be the result of search each record 
      based on id in the filtered stream into the events stream where the id must match and return the event if found)
.map(event -> new EventOutDto(event)).addSink(new RichSinkFunction());
}

我有这个代码

filtered.join(events)
                    .where(k -> {
                        JsonNode tree = mapper.readTree(k);
                        String id = "";
                        if (tree.get("Id") != null) {
                            id = tree.get("Id").asText();
                        }
                        return id;
                    })
                    .equalTo(e -> {
                        return e.Id;
                    })
                    .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                    .apply(new JoinFunction<String,Event,BehSingleEventTriggerDTO>() {
                        @Override
                        public EventOutDto join(String s,Event event) throws Exception {
                            return new EventOutDto(event);
                        }
                    })
                    .addSink(new SinkFunction());

在上面的代码中一切正常,ids 是相同的,所以基本上 where(id).equalTo(id) 应该可以工作,但过程永远不会到达 apply 函数

观察:Watermark 被分配了相同的时间戳

问题:

  • 知道为什么吗?
  • 我自己解释清楚了吗?

解决方法

我通过这样做解决了连接:

SingleOutputStreamOperator<ObjectDTO> triggers = candidates
                    .keyBy(new KeySelector())
                    .intervalJoin(keyedStream.keyBy(e -> e.Id))
                    .between(Time.milliseconds(-2),Time.milliseconds(1))
                    .process(new new ProcessFunctionOne())
                    .keyBy(k -> k.otherId)
                    .process(new ProcessFunctionTwo());

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...