问题描述
我有一个监听Kafka的flink进程。然后,消耗的消息将在并发哈希图中保存一段时间,然后需要下沉到cassandra。
操作员链类似于
DataStream<Message> datastream = KafkaSource.createsource();
DataStream<Message> decodededMessage = datastream.flatmap(new DecodeMessage());
decodedMessage.assigneTimestampsandWatermarks(new AscendingTimestampExtractor<Message>(){
public long extractAscendingTimestamp(Message m){
return message.getTimestamp();
}
}).keyBy((KeySelector<Message>) x-> x.getID())
.process(new Timerfunction())
.addSink(new MySink());
class TimerFunction extends KeyedProcessFunction<Integer,Message,Message>{
private ValueState<Message> x;
public void processElement(){
//some logic to create timestamp for one minute
context.timerService().registerEventTimeTimer(x.getTimestamp());
}
public void onTimer()
// output values on trigger
}
在使用eventime时我有些疑问
我需要介绍以下情况
-
ID为1的X消息在8小时1分1秒到达
-
ID为2的Y消息在8小时1分4秒到达
因为我使用Id作为密钥,所以这两个消息都应将计时器设置为在8小时2分0秒触发。 根据flink文档,如果计时器的时间戳相同,则将仅触发一次。 我遇到一个问题,其中源变为空闲状态几分钟,计时器一直在等待下一个水印, 永不触发。如何处理空闲源?
-
在这种情况下,使用处理时间是否是更好的选择?
-
我也有使用Flink v1.8的限制,因此需要有关该版本的一些信息。
预先感谢
解决方法
我不完全理解你的问题;缺少太多上下文。但我可以提供几点:
(1)keyBy
昂贵:它会强制序列化/反序列化以及网络改组。
(2)当且仅当它们具有相同的时间戳和相同的键时,才对重复数据进行重复数据删除。
(3)对于空闲源问题,事件时间计时器最终将在事件再次开始流动时触发,因为这会使水印提前。如果迫不及待,可以使用https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java之类的东西,或者切换到处理时间。