问题描述
我有两个DataStreams
,第一个叫做DataStream<String> source
,它从消息代理接收记录,第二个是SingleOutputoperator<Event> events
,它是将源映射到Event.class
。
我有一个用例需要使用 SingleOutputoperator<Event> events
,其他用例需要使用 DataStream<String> source
。在使用 DataStream<String> source
的用例之一中,我需要在应用一些过滤器后加入 SingleOutputoperator<String> result
并避免将 source
再次映射到 Event.class
,因为我已经有了该操作已完成且 Stream
,我需要将每条记录搜索到 SingleOutputoperator<String> result
中的 SingleOutputoperator<Event> events
并应用另一个映射来导出 SingleOutputoperator<EventOutDto> out
。
这是一个例子:
DataStream<String> source = env.readFrom(source);
SingleOutputoperator<Event> events = source.map(s -> mapper.readValue(s,Event.class));
public void filterandJoin(DataStream<String> source,SingleOutputoperator<Event> events){
SingleOutputoperator<String> filtered = source.filter(s -> new FilterFunction());
SingleOutputoperator<EventOutDto> result = (this will be the result of search each record
based on id in the filtered stream into the events stream where the id must match and return the event if found)
.map(event -> new EventOutDto(event)).addSink(new RichSinkFunction());
}
filtered.join(events)
.where(k -> {
JsonNode tree = mapper.readTree(k);
String id = "";
if (tree.get("Id") != null) {
id = tree.get("Id").asText();
}
return id;
})
.equalTo(e -> {
return e.Id;
})
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.apply(new JoinFunction<String,Event,BehSingleEventTriggerDTO>() {
@Override
public EventOutDto join(String s,Event event) throws Exception {
return new EventOutDto(event);
}
})
.addSink(new SinkFunction());
在上面的代码中一切正常,ids
是相同的,所以基本上 where(id).equalTo(id)
应该可以工作,但过程永远不会到达 apply
函数。
观察:Watermark
被分配了相同的时间戳
问题:
- 知道为什么吗?
- 我自己解释清楚了吗?
解决方法
我通过这样做解决了连接:
SingleOutputStreamOperator<ObjectDTO> triggers = candidates
.keyBy(new KeySelector())
.intervalJoin(keyedStream.keyBy(e -> e.Id))
.between(Time.milliseconds(-2),Time.milliseconds(1))
.process(new new ProcessFunctionOne())
.keyBy(k -> k.otherId)
.process(new ProcessFunctionTwo());