计时器上的Flink Keyedprocess函数用于空闲源

问题描述

我有一个监听Kafka的flink进程。然后,消耗的消息将在并发哈希图中保存一段时间,然后需要下沉到cassandra。

操作员链类似于

DataStream<Message> datastream = KafkaSource.createsource();
DataStream<Message> decodededMessage = datastream.flatmap(new DecodeMessage());

decodedMessage.assigneTimestampsandWatermarks(new AscendingTimestampExtractor<Message>(){

  public long extractAscendingTimestamp(Message m){

      return message.getTimestamp();
  }
  
}).keyBy((KeySelector<Message>) x-> x.getID())
  .process(new Timerfunction())
  .addSink(new MySink());



class TimerFunction extends KeyedProcessFunction<Integer,Message,Message>{

    private ValueState<Message> x;

    public void processElement(){
//some logic to create timestamp for one minute

     context.timerService().registerEventTimeTimer(x.getTimestamp());
     }
  
    public void onTimer()
  // output values on trigger
}


在使用eventime时我有些疑问

  • 消息将具有唯一的ID和时间戳以及其他一些属性。一分钟内可能会有一百万个唯一键。 keyBy操作会影响性能吗?

我需要介绍以下情况

  • ID为1的X消息在8小时1分1秒到达

  • ID为2的Y消息在8小时1分4秒到达

    因为我使用Id作为密钥,所以这两个消息都应将计时器设置为在8小时2分0秒触发。 根据flink文档,如果计时器的时间戳相同,则将仅触发一次。 我遇到一个问题,其中源变为空闲状态几分钟,计时器一直在等待下一个水印, 永不触发。如何处理空闲源?

  • 在这种情况下,使用处理时间是否是更好的选择?

  • 我也有使用Flink v1.8的限制,因此需要有关该版本的一些信息。

预先感谢

解决方法

我不完全理解你的问题;缺少太多上下文。但我可以提供几点:

(1)keyBy昂贵:它会强制序列化/反序列化以及网络改组。

(2)当且仅当它们具有相同的时间戳和相同的键时,才对重复数据进行重复数据删除。

(3)对于空闲源问题,事件时间计时器最终将在事件再次开始流动时触发,因为这会使水印提前。如果迫不及待,可以使用https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java之类的东西,或者切换到处理时间。